19 package org.sleuthkit.autopsy.coordinationservice;
 
   21 import java.io.IOException;
 
   22 import java.util.Collection;
 
   23 import java.util.Iterator;
 
   24 import java.util.List;
 
   26 import java.util.concurrent.ConcurrentHashMap;
 
   27 import java.util.concurrent.TimeUnit;
 
   28 import javax.annotation.concurrent.GuardedBy;
 
   29 import javax.annotation.concurrent.ThreadSafe;
 
   30 import org.apache.curator.RetryPolicy;
 
   31 import org.apache.curator.framework.CuratorFramework;
 
   32 import org.apache.curator.framework.CuratorFrameworkFactory;
 
   33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 
   34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 
   35 import org.apache.curator.retry.ExponentialBackoffRetry;
 
   36 import org.apache.zookeeper.CreateMode;
 
   37 import org.apache.zookeeper.KeeperException;
 
   38 import org.apache.zookeeper.KeeperException.NoNodeException;
 
   39 import org.apache.zookeeper.WatchedEvent;
 
   40 import org.apache.zookeeper.ZooDefs;
 
   41 import org.apache.zookeeper.ZooKeeper;
 
   42 import org.openide.util.Lookup;
 
   59     @GuardedBy(
"CoordinationService.class")
 
   75         boolean result = 
false;
 
   76         Object workerThreadWaitNotifyLock = 
new Object();
 
   79         ZooKeeper zooKeeper = 
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
 
   80                 (WatchedEvent event) -> {
 
   81                     synchronized (workerThreadWaitNotifyLock) {
 
   82                         workerThreadWaitNotifyLock.notify();
 
   85         synchronized (workerThreadWaitNotifyLock) {
 
   86             workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
 
   88         ZooKeeper.States state = zooKeeper.getState();
 
   89         if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
 
  109             Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
 
  111                 rootNode = it.next().getNamespaceRoot();
 
  117             } 
catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
 
  118                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
  142         RetryPolicy retryPolicy = 
new ExponentialBackoffRetry(1000, 3);
 
  145         curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
 
  151         String rootNode = rootNodeName;
 
  153         if (!rootNode.startsWith(
"/")) {
 
  154             rootNode = 
"/" + rootNode;
 
  158             String nodePath = rootNode + 
"/" + node.getDisplayName();
 
  160                 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
 
  161             } 
catch (KeeperException ex) {
 
  162                 if (ex.code() != KeeperException.Code.NODEEXISTS) {
 
  165             } 
catch (Exception ex) {
 
  195             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  196             if (lock.writeLock().acquire(timeOut, timeUnit)) {
 
  197                 return new Lock(nodePath, lock.writeLock());
 
  201         } 
catch (Exception ex) {
 
  202             if (ex instanceof InterruptedException) {
 
  203                 throw (InterruptedException) ex;
 
  205                 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  229             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  230             if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
 
  233             return new Lock(nodePath, lock.writeLock());
 
  234         } 
catch (Exception ex) {
 
  235             throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  262             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  263             if (lock.readLock().acquire(timeOut, timeUnit)) {
 
  264                 return new Lock(nodePath, lock.readLock());
 
  268         } 
catch (Exception ex) {
 
  269             if (ex instanceof InterruptedException) {
 
  270                 throw (InterruptedException) ex;
 
  272                 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  296             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  297             if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
 
  300             return new Lock(nodePath, lock.readLock());
 
  301         } 
catch (Exception ex) {
 
  302             throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  323             return curator.getData().forPath(fullNodePath);
 
  324         } 
catch (NoNodeException ex) {
 
  326         } 
catch (Exception ex) {
 
  327             if (ex instanceof InterruptedException) {
 
  328                 throw (InterruptedException) ex;
 
  330                 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
 
  347     public void setNodeData(
CategoryNode category, String nodePath, byte[] data) 
throws CoordinationServiceException, InterruptedException {
 
  350             curator.setData().forPath(fullNodePath, data);
 
  351         } 
catch (Exception ex) {
 
  352             if (ex instanceof InterruptedException) {
 
  353                 throw (InterruptedException) ex;
 
  355                 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
 
  372             curator.delete().forPath(fullNodePath);
 
  373         } 
catch (Exception ex) {
 
  374             throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
 
  392         } 
catch (Exception ex) {
 
  393             throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
 
  407         if(nodePath.startsWith(
"/")){
 
  417     public final static class CoordinationServiceException 
extends Exception {
 
  426             super(message, cause);
 
  435     public static class Lock implements AutoCloseable {
 
  444         private Lock(String nodePath, InterProcessMutex lock) {
 
  446             this.interProcessLock = lock;
 
  453         public void release() throws CoordinationServiceException {
 
  455                 this.interProcessLock.release();
 
  456             } 
catch (Exception ex) {
 
  457                 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
 
  462         public void close() throws CoordinationServiceException {
 
  482             this.displayName = displayName;
 
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator