19 package org.sleuthkit.autopsy.coordinationservice;
 
   21 import java.io.IOException;
 
   22 import java.util.Collection;
 
   23 import java.util.Iterator;
 
   25 import java.util.concurrent.ConcurrentHashMap;
 
   26 import java.util.concurrent.TimeUnit;
 
   27 import javax.annotation.concurrent.GuardedBy;
 
   28 import javax.annotation.concurrent.ThreadSafe;
 
   29 import org.apache.curator.RetryPolicy;
 
   30 import org.apache.curator.framework.CuratorFramework;
 
   31 import org.apache.curator.framework.CuratorFrameworkFactory;
 
   32 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 
   33 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 
   34 import org.apache.curator.retry.ExponentialBackoffRetry;
 
   35 import org.apache.zookeeper.CreateMode;
 
   36 import org.apache.zookeeper.KeeperException;
 
   37 import org.apache.zookeeper.KeeperException.NoNodeException;
 
   38 import org.apache.zookeeper.WatchedEvent;
 
   39 import org.apache.zookeeper.ZooDefs;
 
   40 import org.apache.zookeeper.ZooKeeper;
 
   41 import org.openide.util.Lookup;
 
   58     @GuardedBy(
"CoordinationService.class")
 
   74         boolean result = 
false;
 
   75         Object workerThreadWaitNotifyLock = 
new Object();
 
   78         ZooKeeper zooKeeper = 
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
 
   79                 (WatchedEvent event) -> {
 
   80                     synchronized (workerThreadWaitNotifyLock) {
 
   81                         workerThreadWaitNotifyLock.notify();
 
   84         synchronized (workerThreadWaitNotifyLock) {
 
   85             workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
 
   87         ZooKeeper.States state = zooKeeper.getState();
 
   88         if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
 
  108             Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
 
  110                 rootNode = it.next().getNamespaceRoot();
 
  116             } 
catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
 
  117                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
  141         RetryPolicy retryPolicy = 
new ExponentialBackoffRetry(1000, 3);
 
  144         curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
 
  150         String rootNode = rootNodeName;
 
  151         if (!rootNode.startsWith(
"/")) {
 
  152             rootNode = 
"/" + rootNode;
 
  156             String nodePath = rootNode + 
"/" + node.getDisplayName();
 
  158                 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
 
  159             } 
catch (KeeperException ex) {
 
  160                 if (ex.code() != KeeperException.Code.NODEEXISTS) {
 
  163             } 
catch (Exception ex) {
 
  193             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  194             if (lock.writeLock().acquire(timeOut, timeUnit)) {
 
  195                 return new Lock(nodePath, lock.writeLock());
 
  199         } 
catch (Exception ex) {
 
  200             if (ex instanceof InterruptedException) {
 
  201                 throw (InterruptedException) ex;
 
  203                 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  227             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  228             if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
 
  231             return new Lock(nodePath, lock.writeLock());
 
  232         } 
catch (Exception ex) {
 
  233             throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  260             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  261             if (lock.readLock().acquire(timeOut, timeUnit)) {
 
  262                 return new Lock(nodePath, lock.readLock());
 
  266         } 
catch (Exception ex) {
 
  267             if (ex instanceof InterruptedException) {
 
  268                 throw (InterruptedException) ex;
 
  270                 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  294             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  295             if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
 
  298             return new Lock(nodePath, lock.readLock());
 
  299         } 
catch (Exception ex) {
 
  300             throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  321             return curator.getData().forPath(fullNodePath);
 
  322         } 
catch (NoNodeException ex) {
 
  324         } 
catch (Exception ex) {
 
  325             if (ex instanceof InterruptedException) {
 
  326                 throw (InterruptedException) ex;
 
  328                 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
 
  345     public void setNodeData(
CategoryNode category, String nodePath, byte[] data) 
throws CoordinationServiceException, InterruptedException {
 
  348             curator.setData().forPath(fullNodePath, data);
 
  349         } 
catch (Exception ex) {
 
  350             if (ex instanceof InterruptedException) {
 
  351                 throw (InterruptedException) ex;
 
  353                 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
 
  373     public final static class CoordinationServiceException 
extends Exception {
 
  382             super(message, cause);
 
  391     public static class Lock implements AutoCloseable {
 
  400         private Lock(String nodePath, InterProcessMutex lock) {
 
  402             this.interProcessLock = lock;
 
  409         public void release() throws CoordinationServiceException {
 
  411                 this.interProcessLock.release();
 
  412             } 
catch (Exception ex) {
 
  413                 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
 
  418         public void close() throws CoordinationServiceException {
 
  436             this.displayName = displayName;
 
final InterProcessMutex interProcessLock
 
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
 
static final String DEFAULT_NAMESPACE_ROOT
 
static String getIndexingServerPort()
 
static final int CONNECTION_TIMEOUT_MILLISECONDS
 
CategoryNode(String displayName)
 
static final long serialVersionUID
 
byte[] getNodeData(CategoryNode category, String nodePath)
 
static CoordinationService instance
 
final Map< String, String > categoryNodeToPath
 
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
 
static final int SESSION_TIMEOUT_MILLISECONDS
 
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
 
Lock(String nodePath, InterProcessMutex lock)
 
Lock tryGetSharedLock(CategoryNode category, String nodePath)
 
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
 
static boolean isZooKeeperAccessible()
 
void setNodeData(CategoryNode category, String nodePath, byte[] data)
 
CoordinationService(String rootNodeName)
 
CoordinationServiceException(String message)
 
static final int PORT_OFFSET
 
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
 
static synchronized CoordinationService getInstance()
 
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
 
CoordinationServiceException(String message, Throwable cause)
 
static String getIndexingServerHost()
 
final CuratorFramework curator