19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.apache.curator.RetryPolicy;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.CuratorFrameworkFactory;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
35 import org.apache.curator.retry.ExponentialBackoffRetry;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.KeeperException.NoNodeException;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.ZooDefs;
41 import org.apache.zookeeper.ZooKeeper;
42 import org.openide.util.Lookup;
59 @GuardedBy(
"CoordinationService.class")
75 boolean result =
false;
76 Object workerThreadWaitNotifyLock =
new Object();
79 ZooKeeper zooKeeper =
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
80 (WatchedEvent event) -> {
81 synchronized (workerThreadWaitNotifyLock) {
82 workerThreadWaitNotifyLock.notify();
85 synchronized (workerThreadWaitNotifyLock) {
86 workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
88 ZooKeeper.States state = zooKeeper.getState();
89 if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
109 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
111 rootNode = it.next().getNamespaceRoot();
117 }
catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
118 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
142 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
145 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
151 String rootNode = rootNodeName;
153 if (!rootNode.startsWith(
"/")) {
154 rootNode =
"/" + rootNode;
158 String nodePath = rootNode +
"/" + node.getDisplayName();
160 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
161 }
catch (KeeperException ex) {
162 if (ex.code() != KeeperException.Code.NODEEXISTS) {
165 }
catch (Exception ex) {
195 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
196 if (lock.writeLock().acquire(timeOut, timeUnit)) {
197 return new Lock(nodePath, lock.writeLock());
201 }
catch (Exception ex) {
202 if (ex instanceof InterruptedException) {
203 throw (InterruptedException) ex;
205 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
229 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
230 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
233 return new Lock(nodePath, lock.writeLock());
234 }
catch (Exception ex) {
235 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
262 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
263 if (lock.readLock().acquire(timeOut, timeUnit)) {
264 return new Lock(nodePath, lock.readLock());
268 }
catch (Exception ex) {
269 if (ex instanceof InterruptedException) {
270 throw (InterruptedException) ex;
272 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
296 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
297 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
300 return new Lock(nodePath, lock.readLock());
301 }
catch (Exception ex) {
302 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
323 return curator.getData().forPath(fullNodePath);
324 }
catch (NoNodeException ex) {
326 }
catch (Exception ex) {
327 if (ex instanceof InterruptedException) {
328 throw (InterruptedException) ex;
330 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
347 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
350 curator.setData().forPath(fullNodePath, data);
351 }
catch (Exception ex) {
352 if (ex instanceof InterruptedException) {
353 throw (InterruptedException) ex;
355 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
372 curator.delete().forPath(fullNodePath);
373 }
catch (Exception ex) {
374 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
392 }
catch (Exception ex) {
393 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
407 if(nodePath.startsWith(
"/")){
417 public final static class CoordinationServiceException
extends Exception {
426 super(message, cause);
435 public static class Lock implements AutoCloseable {
444 private Lock(String nodePath, InterProcessMutex lock) {
446 this.interProcessLock = lock;
453 public void release() throws CoordinationServiceException {
455 this.interProcessLock.release();
456 }
catch (Exception ex) {
457 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
462 public void close() throws CoordinationServiceException {
482 this.displayName = displayName;
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator