Autopsy  4.12.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
CoordinationService.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2018 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.coordinationservice;
20 
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.apache.curator.RetryPolicy;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.CuratorFrameworkFactory;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
35 import org.apache.curator.retry.ExponentialBackoffRetry;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.KeeperException.NoNodeException;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.ZooDefs;
41 import org.apache.zookeeper.ZooKeeper;
42 import org.openide.util.Lookup;
44 
50 @ThreadSafe
51 public final class CoordinationService {
52 
53  private static final int SESSION_TIMEOUT_MILLISECONDS = 300000;
54  private static final int CONNECTION_TIMEOUT_MILLISECONDS = 300000;
55  private static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 3000;
56  private static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS = 15000;
57  private static final int PORT_OFFSET = 1000; // When run in Solr, ZooKeeper defaults to Solr port + 1000
58  private static final String DEFAULT_NAMESPACE_ROOT = "autopsy";
59  @GuardedBy("CoordinationService.class")
60  private static CoordinationService instance;
61  private final CuratorFramework curator;
62  @GuardedBy("categoryNodeToPath")
63  private final Map<String, String> categoryNodeToPath;
64 
74  private static boolean isZooKeeperAccessible() throws InterruptedException, IOException {
75  boolean result = false;
76  Object workerThreadWaitNotifyLock = new Object();
77  int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
78  String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
79  ZooKeeper zooKeeper = new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
80  (WatchedEvent event) -> {
81  synchronized (workerThreadWaitNotifyLock) {
82  workerThreadWaitNotifyLock.notify();
83  }
84  });
85  synchronized (workerThreadWaitNotifyLock) {
86  workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
87  }
88  ZooKeeper.States state = zooKeeper.getState();
89  if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
90  result = true;
91  }
92  zooKeeper.close();
93  return result;
94  }
95 
105  public synchronized static CoordinationService getInstance() throws CoordinationServiceException {
106  if (null == instance) {
107  String rootNode;
108  Collection<? extends CoordinationServiceNamespace> providers = Lookup.getDefault().lookupAll(CoordinationServiceNamespace.class);
109  Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
110  if (it.hasNext()) {
111  rootNode = it.next().getNamespaceRoot();
112  } else {
113  rootNode = DEFAULT_NAMESPACE_ROOT;
114  }
115  try {
116  instance = new CoordinationService(rootNode);
117  } catch (IOException | KeeperException | CoordinationServiceException ex) {
118  throw new CoordinationServiceException("Failed to create coordination service", ex);
119  } catch (InterruptedException ex) {
120  /*
121  * The interrupted exception should be propagated to support
122  * task cancellation. To avoid a public API change here, restore
123  * the interrupted flag and then throw the InterruptedException
124  * in its wrapper.
125  */
126  Thread.currentThread().interrupt();
127  throw new CoordinationServiceException("Failed to create coordination service", ex);
128  }
129  }
130  return instance;
131  }
132 
142  private CoordinationService(String rootNodeName) throws InterruptedException, IOException, KeeperException, CoordinationServiceException {
143 
144  if (false == isZooKeeperAccessible()) {
145  throw new CoordinationServiceException("Unable to access ZooKeeper");
146  }
147 
148  /*
149  * Connect to ZooKeeper via Curator.
150  */
151  RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
152  int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
153  String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
154  curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
155  curator.start();
156 
157  /*
158  * Create the top-level root and category nodes.
159  */
160  String rootNode = rootNodeName;
161 
162  if (!rootNode.startsWith("/")) {
163  rootNode = "/" + rootNode;
164  }
165  categoryNodeToPath = new ConcurrentHashMap<>();
166  for (CategoryNode node : CategoryNode.values()) {
167  String nodePath = rootNode + "/" + node.getDisplayName();
168  try {
169  curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
170  } catch (KeeperException ex) {
171  if (ex.code() != KeeperException.Code.NODEEXISTS) {
172  throw ex;
173  }
174  } catch (Exception ex) {
175  throw new CoordinationServiceException("Curator experienced an error", ex);
176  }
177  categoryNodeToPath.put(node.getDisplayName(), nodePath);
178  }
179  }
180 
201  public Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit) throws CoordinationServiceException, InterruptedException {
202  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
203  try {
204  InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, fullNodePath);
205  if (lock.writeLock().acquire(timeOut, timeUnit)) {
206  return new Lock(nodePath, lock.writeLock());
207  } else {
208  return null;
209  }
210  } catch (Exception ex) {
211  if (ex instanceof InterruptedException) {
212  throw (InterruptedException) ex;
213  } else {
214  throw new CoordinationServiceException(String.format("Failed to get exclusive lock for %s", fullNodePath), ex);
215  }
216  }
217  }
218 
235  public Lock tryGetExclusiveLock(CategoryNode category, String nodePath) throws CoordinationServiceException {
236  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
237  try {
238  InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, fullNodePath);
239  if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
240  return null;
241  }
242  return new Lock(nodePath, lock.writeLock());
243  } catch (Exception ex) {
244  throw new CoordinationServiceException(String.format("Failed to get exclusive lock for %s", fullNodePath), ex);
245  }
246  }
247 
268  public Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit) throws CoordinationServiceException, InterruptedException {
269  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
270  try {
271  InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, fullNodePath);
272  if (lock.readLock().acquire(timeOut, timeUnit)) {
273  return new Lock(nodePath, lock.readLock());
274  } else {
275  return null;
276  }
277  } catch (Exception ex) {
278  if (ex instanceof InterruptedException) {
279  throw (InterruptedException) ex;
280  } else {
281  throw new CoordinationServiceException(String.format("Failed to get shared lock for %s", fullNodePath), ex);
282  }
283  }
284  }
285 
302  public Lock tryGetSharedLock(CategoryNode category, String nodePath) throws CoordinationServiceException {
303  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
304  try {
305  InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curator, fullNodePath);
306  if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
307  return null;
308  }
309  return new Lock(nodePath, lock.readLock());
310  } catch (Exception ex) {
311  throw new CoordinationServiceException(String.format("Failed to get shared lock for %s", fullNodePath), ex);
312  }
313  }
314 
329  public byte[] getNodeData(CategoryNode category, String nodePath) throws CoordinationServiceException, InterruptedException {
330  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
331  try {
332  return curator.getData().forPath(fullNodePath);
333  } catch (NoNodeException ex) {
334  return null;
335  } catch (Exception ex) {
336  if (ex instanceof InterruptedException) {
337  throw (InterruptedException) ex;
338  } else {
339  throw new CoordinationServiceException(String.format("Failed to get data for %s", fullNodePath), ex);
340  }
341  }
342  }
343 
356  public void setNodeData(CategoryNode category, String nodePath, byte[] data) throws CoordinationServiceException, InterruptedException {
357  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
358  try {
359  curator.setData().forPath(fullNodePath, data);
360  } catch (Exception ex) {
361  if (ex instanceof InterruptedException) {
362  throw (InterruptedException) ex;
363  } else {
364  throw new CoordinationServiceException(String.format("Failed to set data for %s", fullNodePath), ex);
365  }
366  }
367  }
368 
381  public void deleteNode(CategoryNode category, String nodePath) throws CoordinationServiceException, InterruptedException {
382  String fullNodePath = getFullyQualifiedNodePath(category, nodePath);
383  try {
384  curator.delete().forPath(fullNodePath);
385  } catch (Exception ex) {
386  if (ex instanceof InterruptedException) {
387  throw (InterruptedException) ex;
388  } else {
389  throw new CoordinationServiceException(String.format("Failed to delete node %s", fullNodePath), ex);
390  }
391  }
392  }
393 
407  public List<String> getNodeList(CategoryNode category) throws CoordinationServiceException, InterruptedException {
408  try {
409  List<String> list = curator.getChildren().forPath(categoryNodeToPath.get(category.getDisplayName()));
410  return list;
411  } catch (Exception ex) {
412  if (ex instanceof InterruptedException) {
413  throw (InterruptedException) ex;
414  } else {
415  throw new CoordinationServiceException(String.format("Failed to get node list for %s", category.getDisplayName()), ex);
416  }
417  }
418  }
419 
428  private String getFullyQualifiedNodePath(CategoryNode category, String nodePath) {
429  // nodePath on Unix systems starts with a "/" and ZooKeeper doesn't like two slashes in a row
430  if (nodePath.startsWith("/")) {
431  return categoryNodeToPath.get(category.getDisplayName()) + nodePath.toUpperCase();
432  } else {
433  return categoryNodeToPath.get(category.getDisplayName()) + "/" + nodePath.toUpperCase();
434  }
435  }
436 
440  public final static class CoordinationServiceException extends Exception {
441 
442  private static final long serialVersionUID = 1L;
443 
444  private CoordinationServiceException(String message) {
445  super(message);
446  }
447 
448  private CoordinationServiceException(String message, Throwable cause) {
449  super(message, cause);
450  }
451  }
452 
458  public static class Lock implements AutoCloseable {
459 
464  private final InterProcessMutex interProcessLock;
465  private final String nodePath;
466 
467  private Lock(String nodePath, InterProcessMutex lock) {
468  this.nodePath = nodePath;
469  this.interProcessLock = lock;
470  }
471 
472  public String getNodePath() {
473  return nodePath;
474  }
475 
476  public void release() throws CoordinationServiceException {
477  try {
478  this.interProcessLock.release();
479  } catch (Exception ex) {
480  throw new CoordinationServiceException(String.format("Failed to release the lock on %s", nodePath), ex);
481  }
482  }
483 
484  @Override
485  public void close() throws CoordinationServiceException {
486  release();
487  }
488  }
489 
494  public enum CategoryNode {
495 
496  CASES("cases"),
497  MANIFESTS("manifests"),
498  CONFIG("config"),
499  CENTRAL_REPO("centralRepository"),
500  HEALTH_MONITOR("healthMonitor");
501 
502  private final String displayName;
503 
504  private CategoryNode(String displayName) {
505  this.displayName = displayName;
506  }
507 
508  public String getDisplayName() {
509  return displayName;
510  }
511  }
512 }
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
byte[] getNodeData(CategoryNode category, String nodePath)
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
void setNodeData(CategoryNode category, String nodePath, byte[] data)
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)

Copyright © 2012-2018 Basis Technology. Generated on: Wed Sep 18 2019
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.