19 package org.sleuthkit.autopsy.coordinationservice;
 
   21 import java.io.IOException;
 
   22 import java.util.Collection;
 
   23 import java.util.Iterator;
 
   24 import java.util.List;
 
   26 import java.util.concurrent.ConcurrentHashMap;
 
   27 import java.util.concurrent.TimeUnit;
 
   28 import javax.annotation.concurrent.GuardedBy;
 
   29 import javax.annotation.concurrent.ThreadSafe;
 
   30 import org.apache.curator.RetryPolicy;
 
   31 import org.apache.curator.framework.CuratorFramework;
 
   32 import org.apache.curator.framework.CuratorFrameworkFactory;
 
   33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 
   34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 
   35 import org.apache.curator.retry.ExponentialBackoffRetry;
 
   36 import org.apache.curator.utils.ZKPaths;
 
   37 import org.apache.zookeeper.CreateMode;
 
   38 import org.apache.zookeeper.KeeperException;
 
   39 import org.apache.zookeeper.KeeperException.NoNodeException;
 
   40 import org.apache.zookeeper.ZooDefs;
 
   41 import org.openide.util.Lookup;
 
   57     @GuardedBy(
"CoordinationService.class")
 
   73         if (null == instance) {
 
   76             Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
 
   78                 rootNode = it.next().getNamespaceRoot();
 
   84             } 
catch (IOException | KeeperException | CoordinationServiceException ex) {
 
   85                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
   86             } 
catch (InterruptedException ex) {
 
   93                 Thread.currentThread().interrupt();
 
   94                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
  114         if (hostName.isEmpty() || port.isEmpty()) {
 
  118             port = Integer.toString(portInt);
 
  129         RetryPolicy retryPolicy = 
new ExponentialBackoffRetry(1000, 3);
 
  130         String connectString = hostName + 
":" + port;
 
  131         curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
 
  137         String rootNode = rootNodeName;
 
  139         if (!rootNode.startsWith(
"/")) {
 
  140             rootNode = 
"/" + rootNode;
 
  144             String nodePath = rootNode + 
"/" + node.getDisplayName();
 
  146                 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
 
  147             } 
catch (KeeperException ex) {
 
  148                 if (ex.code() != KeeperException.Code.NODEEXISTS) {
 
  151             } 
catch (Exception ex) {
 
  170         while(fullNodePath.endsWith(
"/")) {
 
  171             fullNodePath = fullNodePath.substring(0, fullNodePath.length() - 1);
 
  176             ZKPaths.mkdirs(
curator.getZookeeperClient().getZooKeeper(), fullNodePath);
 
  178         } 
catch (Exception ex) {
 
  204         String fullNodePath = 
"";
 
  208             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  209             if (lock.writeLock().acquire(timeOut, timeUnit)) {
 
  210                 return new Lock(nodePath, lock.writeLock());
 
  214         } 
catch (Exception ex) {
 
  215             if (ex instanceof InterruptedException) {
 
  216                 throw (InterruptedException) ex;
 
  218                 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  240         String fullNodePath = 
"";
 
  244             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  245             if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
 
  248             return new Lock(nodePath, lock.writeLock());
 
  249         } 
catch (Exception ex) {
 
  250             throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  275         String fullNodePath = 
"";
 
  279             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  280             if (lock.readLock().acquire(timeOut, timeUnit)) {
 
  281                 return new Lock(nodePath, lock.readLock());
 
  285         } 
catch (Exception ex) {
 
  286             if (ex instanceof InterruptedException) {
 
  287                 throw (InterruptedException) ex;
 
  289                 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  311         String fullNodePath = 
"";
 
  315             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  316             if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
 
  319             return new Lock(nodePath, lock.readLock());
 
  320         } 
catch (Exception ex) {
 
  321             throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  340         String fullNodePath = 
"";
 
  346             return curator.getData().forPath(fullNodePath);
 
  347         } 
catch (NoNodeException ex) {
 
  349         } 
catch (Exception ex) {
 
  350             if (ex instanceof InterruptedException) {
 
  351                 throw (InterruptedException) ex;
 
  353                 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
 
  370     public void setNodeData(
CategoryNode category, String nodePath, byte[] data) 
throws CoordinationServiceException, InterruptedException {
 
  373             curator.setData().forPath(fullNodePath, data);
 
  374         } 
catch (Exception ex) {
 
  375             if (ex instanceof InterruptedException) {
 
  376                 throw (InterruptedException) ex;
 
  378                 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
 
  395     public void deleteNode(
CategoryNode category, String nodePath) 
throws CoordinationServiceException, InterruptedException {
 
  398             curator.delete().forPath(fullNodePath);
 
  399         } 
catch (Exception ex) {
 
  400             if (ex instanceof InterruptedException) {
 
  401                 throw (InterruptedException) ex;
 
  403                 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
 
  425         } 
catch (Exception ex) {
 
  426             if (ex instanceof InterruptedException) {
 
  427                 throw (InterruptedException) ex;
 
  429                 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
 
  444         if (nodePath.startsWith(
"/")) {
 
  454     public final static class CoordinationServiceException 
extends Exception {
 
  463             super(message, cause);
 
  472     public static class Lock implements AutoCloseable {
 
  481         private Lock(String nodePath, InterProcessMutex lock) {
 
  483             this.interProcessLock = lock;
 
  490         public void release() throws CoordinationServiceException {
 
  492                 this.interProcessLock.release();
 
  493             } 
catch (Exception ex) {
 
  494                 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
 
  499         public void close() throws CoordinationServiceException {
 
  519             this.displayName = displayName;
 
final InterProcessMutex interProcessLock
String upsertNodePath(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static String getZkServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
static String getZkServerHost()
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
static boolean isZooKeeperAccessible(String hostName, String port)
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator