19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.apache.curator.RetryPolicy;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.CuratorFrameworkFactory;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
35 import org.apache.curator.retry.ExponentialBackoffRetry;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.KeeperException.NoNodeException;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.ZooDefs;
41 import org.apache.zookeeper.ZooKeeper;
42 import org.openide.util.Lookup;
59 @GuardedBy(
"CoordinationService.class")
75 boolean result =
false;
76 Object workerThreadWaitNotifyLock =
new Object();
79 ZooKeeper zooKeeper =
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
80 (WatchedEvent event) -> {
81 synchronized (workerThreadWaitNotifyLock) {
82 workerThreadWaitNotifyLock.notify();
85 synchronized (workerThreadWaitNotifyLock) {
86 workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
88 ZooKeeper.States state = zooKeeper.getState();
89 if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
109 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
111 rootNode = it.next().getNamespaceRoot();
117 }
catch (IOException | KeeperException | CoordinationServiceException ex) {
118 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
119 }
catch (InterruptedException ex) {
126 Thread.currentThread().interrupt();
127 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
151 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
154 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
160 String rootNode = rootNodeName;
162 if (!rootNode.startsWith(
"/")) {
163 rootNode =
"/" + rootNode;
167 String nodePath = rootNode +
"/" + node.getDisplayName();
169 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
170 }
catch (KeeperException ex) {
171 if (ex.code() != KeeperException.Code.NODEEXISTS) {
174 }
catch (Exception ex) {
204 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
205 if (lock.writeLock().acquire(timeOut, timeUnit)) {
206 return new Lock(nodePath, lock.writeLock());
210 }
catch (Exception ex) {
211 if (ex instanceof InterruptedException) {
212 throw (InterruptedException) ex;
214 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
238 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
239 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
242 return new Lock(nodePath, lock.writeLock());
243 }
catch (Exception ex) {
244 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
271 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
272 if (lock.readLock().acquire(timeOut, timeUnit)) {
273 return new Lock(nodePath, lock.readLock());
277 }
catch (Exception ex) {
278 if (ex instanceof InterruptedException) {
279 throw (InterruptedException) ex;
281 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
305 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
306 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
309 return new Lock(nodePath, lock.readLock());
310 }
catch (Exception ex) {
311 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
332 return curator.getData().forPath(fullNodePath);
333 }
catch (NoNodeException ex) {
335 }
catch (Exception ex) {
336 if (ex instanceof InterruptedException) {
337 throw (InterruptedException) ex;
339 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
356 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
359 curator.setData().forPath(fullNodePath, data);
360 }
catch (Exception ex) {
361 if (ex instanceof InterruptedException) {
362 throw (InterruptedException) ex;
364 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
381 public void deleteNode(
CategoryNode category, String nodePath)
throws CoordinationServiceException, InterruptedException {
384 curator.delete().forPath(fullNodePath);
385 }
catch (Exception ex) {
386 if (ex instanceof InterruptedException) {
387 throw (InterruptedException) ex;
389 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
411 }
catch (Exception ex) {
412 if (ex instanceof InterruptedException) {
413 throw (InterruptedException) ex;
415 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
430 if (nodePath.startsWith(
"/")) {
440 public final static class CoordinationServiceException
extends Exception {
449 super(message, cause);
458 public static class Lock implements AutoCloseable {
467 private Lock(String nodePath, InterProcessMutex lock) {
469 this.interProcessLock = lock;
476 public void release() throws CoordinationServiceException {
478 this.interProcessLock.release();
479 }
catch (Exception ex) {
480 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
485 public void close() throws CoordinationServiceException {
505 this.displayName = displayName;
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator