19 package org.sleuthkit.autopsy.coordinationservice;
 
   21 import java.io.IOException;
 
   22 import java.util.Collection;
 
   23 import java.util.Iterator;
 
   24 import java.util.List;
 
   26 import java.util.concurrent.ConcurrentHashMap;
 
   27 import java.util.concurrent.TimeUnit;
 
   28 import javax.annotation.concurrent.GuardedBy;
 
   29 import javax.annotation.concurrent.ThreadSafe;
 
   30 import org.apache.curator.RetryPolicy;
 
   31 import org.apache.curator.framework.CuratorFramework;
 
   32 import org.apache.curator.framework.CuratorFrameworkFactory;
 
   33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 
   34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 
   35 import org.apache.curator.retry.ExponentialBackoffRetry;
 
   36 import org.apache.zookeeper.CreateMode;
 
   37 import org.apache.zookeeper.KeeperException;
 
   38 import org.apache.zookeeper.KeeperException.NoNodeException;
 
   39 import org.apache.zookeeper.WatchedEvent;
 
   40 import org.apache.zookeeper.ZooDefs;
 
   41 import org.apache.zookeeper.ZooKeeper;
 
   42 import org.openide.util.Lookup;
 
   59     @GuardedBy(
"CoordinationService.class")
 
   75         boolean result = 
false;
 
   76         Object workerThreadWaitNotifyLock = 
new Object();
 
   79         ZooKeeper zooKeeper = 
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
 
   80                 (WatchedEvent event) -> {
 
   81                     synchronized (workerThreadWaitNotifyLock) {
 
   82                         workerThreadWaitNotifyLock.notify();
 
   85         synchronized (workerThreadWaitNotifyLock) {
 
   86             workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
 
   88         ZooKeeper.States state = zooKeeper.getState();
 
   89         if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
 
  109             Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
 
  111                 rootNode = it.next().getNamespaceRoot();
 
  117             } 
catch (IOException | KeeperException | CoordinationServiceException ex) {
 
  118                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
  119             } 
catch (InterruptedException ex) {
 
  126                 Thread.currentThread().interrupt();
 
  127                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
  151         RetryPolicy retryPolicy = 
new ExponentialBackoffRetry(1000, 3);
 
  154         curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
 
  160         String rootNode = rootNodeName;
 
  162         if (!rootNode.startsWith(
"/")) {
 
  163             rootNode = 
"/" + rootNode;
 
  167             String nodePath = rootNode + 
"/" + node.getDisplayName();
 
  169                 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
 
  170             } 
catch (KeeperException ex) {
 
  171                 if (ex.code() != KeeperException.Code.NODEEXISTS) {
 
  174             } 
catch (Exception ex) {
 
  204             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  205             if (lock.writeLock().acquire(timeOut, timeUnit)) {
 
  206                 return new Lock(nodePath, lock.writeLock());
 
  210         } 
catch (Exception ex) {
 
  211             if (ex instanceof InterruptedException) {
 
  212                 throw (InterruptedException) ex;
 
  214                 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  238             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  239             if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
 
  242             return new Lock(nodePath, lock.writeLock());
 
  243         } 
catch (Exception ex) {
 
  244             throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  271             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  272             if (lock.readLock().acquire(timeOut, timeUnit)) {
 
  273                 return new Lock(nodePath, lock.readLock());
 
  277         } 
catch (Exception ex) {
 
  278             if (ex instanceof InterruptedException) {
 
  279                 throw (InterruptedException) ex;
 
  281                 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  305             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  306             if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
 
  309             return new Lock(nodePath, lock.readLock());
 
  310         } 
catch (Exception ex) {
 
  311             throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  332             return curator.getData().forPath(fullNodePath);
 
  333         } 
catch (NoNodeException ex) {
 
  335         } 
catch (Exception ex) {
 
  336             if (ex instanceof InterruptedException) {
 
  337                 throw (InterruptedException) ex;
 
  339                 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
 
  356     public void setNodeData(
CategoryNode category, String nodePath, byte[] data) 
throws CoordinationServiceException, InterruptedException {
 
  359             curator.setData().forPath(fullNodePath, data);
 
  360         } 
catch (Exception ex) {
 
  361             if (ex instanceof InterruptedException) {
 
  362                 throw (InterruptedException) ex;
 
  364                 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
 
  381     public void deleteNode(
CategoryNode category, String nodePath) 
throws CoordinationServiceException, InterruptedException {
 
  384             curator.delete().forPath(fullNodePath);
 
  385         } 
catch (Exception ex) {
 
  386             if (ex instanceof InterruptedException) {
 
  387                 throw (InterruptedException) ex;
 
  389                 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
 
  411         } 
catch (Exception ex) {
 
  412             if (ex instanceof InterruptedException) {
 
  413                 throw (InterruptedException) ex;
 
  415                 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
 
  430         if (nodePath.startsWith(
"/")) {
 
  440     public final static class CoordinationServiceException 
extends Exception {
 
  449             super(message, cause);
 
  458     public static class Lock implements AutoCloseable {
 
  467         private Lock(String nodePath, InterProcessMutex lock) {
 
  469             this.interProcessLock = lock;
 
  476         public void release() throws CoordinationServiceException {
 
  478                 this.interProcessLock.release();
 
  479             } 
catch (Exception ex) {
 
  480                 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
 
  485         public void close() throws CoordinationServiceException {
 
  505             this.displayName = displayName;
 
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator