Autopsy  4.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
CollaborationMonitor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2015 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.casemodule;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Serializable;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.UUID;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import org.netbeans.api.progress.ProgressHandle;
38 import org.netbeans.api.progress.ProgressHandleFactory;
39 import org.openide.util.NbBundle;
51 
57 final class CollaborationMonitor {
58 
59  private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; //NON-NLS
60  private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; //NON-NLS
61  private static final Set<String> CASE_EVENTS_OF_INTEREST = new HashSet<>(Arrays.asList(new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString(), Case.Events.ADDING_DATA_SOURCE_FAILED.toString()}));
62  private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
63  private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; //NON-NLS
64  private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
65  private static final long MAX_MISSED_HEARTBEATS = 5;
66  private static final long STALE_TASKS_DETECTION_INTERVAL_MINUTES = 2;
67  private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
68  private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
69  private final String hostName;
70  private final LocalTasksManager localTasksManager;
71  private final RemoteTasksManager remoteTasksManager;
72  private final AutopsyEventPublisher eventPublisher;
73  private final ScheduledThreadPoolExecutor periodicTasksExecutor;
74 
82  CollaborationMonitor() throws CollaborationMonitorException {
87  hostName = NetworkUtils.getLocalHostName();
88 
93  eventPublisher = new AutopsyEventPublisher();
94  try {
95  Case openedCase = Case.getCurrentCase();
96  String channelPrefix = openedCase.getTextIndexName();
97  eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, channelPrefix));
98  } catch (AutopsyEventException ex) {
99  throw new CollaborationMonitorException("Failed to initialize", ex);
100  }
101 
106  remoteTasksManager = new RemoteTasksManager();
107  eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
108 
112  localTasksManager = new LocalTasksManager();
113  IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
114  Case.addEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
115 
122  periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
123  periodicTasksExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
124  periodicTasksExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECTION_INTERVAL_MINUTES, STALE_TASKS_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
125  }
126 
130  void shutdown() {
131  if (null != periodicTasksExecutor) {
132  periodicTasksExecutor.shutdownNow();
133  try {
134  while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
135  logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
136  }
137  } catch (InterruptedException ex) {
138  logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
139  }
140  }
141 
142  Case.removeEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
143  IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
144 
145  if (null != eventPublisher) {
146  eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
147  eventPublisher.closeRemoteEventChannel();
148  }
149 
150  remoteTasksManager.shutdown();
151  }
152 
160  private final class LocalTasksManager implements PropertyChangeListener {
161 
162  private long nextTaskId;
163  private final Map<Integer, Task> uuidsToAddDataSourceTasks;
164  private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
165 
172  nextTaskId = 0;
173  uuidsToAddDataSourceTasks = new HashMap<>();
174  jobIdsTodataSourceAnalysisTasks = new HashMap<>();
175  }
176 
183  @Override
184  public void propertyChange(PropertyChangeEvent event) {
185  if (AutopsyEvent.SourceType.LOCAL == ((AutopsyEvent) event).getSourceType()) {
186  String eventName = event.getPropertyName();
187  if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
188  addDataSourceAddTask((AddingDataSourceEvent) event);
189  } else if (eventName.equals(Case.Events.ADDING_DATA_SOURCE_FAILED.toString())) {
190  removeDataSourceAddTask(((AddingDataSourceFailedEvent) event).getDataSourceId());
191  } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
192  removeDataSourceAddTask(((DataSourceAddedEvent) event).getDataSourceId());
193  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED.toString())) {
194  addDataSourceAnalysisTask((DataSourceAnalysisStartedEvent) event);
195  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED.toString())) {
196  removeDataSourceAnalysisTask((DataSourceAnalysisCompletedEvent) event);
197  }
198  }
199  }
200 
207  synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
208  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.addingDataSourceStatus.msg", hostName);
209  uuidsToAddDataSourceTasks.put(event.getDataSourceId().hashCode(), new Task(++nextTaskId, status));
210  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
211  }
212 
221  synchronized void removeDataSourceAddTask(UUID dataSourceId) {
222  uuidsToAddDataSourceTasks.remove(dataSourceId.hashCode());
223  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
224  }
225 
232  synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
233  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, event.getDataSource().getName());
234  jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status));
235  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
236  }
237 
245  synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
246  jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
247  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
248  }
249 
255  synchronized Map<Long, Task> getCurrentTasks() {
256  Map<Long, Task> currentTasks = new HashMap<>();
257  uuidsToAddDataSourceTasks.values().stream().forEach((task) -> {
258  currentTasks.put(task.getId(), task);
259  });
260  jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
261  currentTasks.put(task.getId(), task);
262  });
263  return currentTasks;
264  }
265  }
266 
275  private final class RemoteTasksManager implements PropertyChangeListener {
276 
277  private final Map<String, RemoteTasks> hostsToTasks;
278 
285  hostsToTasks = new HashMap<>();
286  }
287 
294  @Override
295  public void propertyChange(PropertyChangeEvent event) {
296  if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
297  updateTasks((CollaborationEvent) event);
298  }
299  }
300 
304  synchronized void shutdown() {
305  finishAllTasks();
306  }
307 
314  synchronized void updateTasks(CollaborationEvent event) {
315  RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
316  if (null != tasksForHost) {
317  tasksForHost.update(event);
318  } else {
319  hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
320  }
321  }
322 
328  synchronized void finishStaleTasks() {
329  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
330  Map.Entry<String, RemoteTasks> entry = it.next();
331  RemoteTasks tasksForHost = entry.getValue();
332  if (tasksForHost.isStale()) {
333  tasksForHost.finishAllTasks();
334  it.remove();
335  }
336  }
337  }
338 
342  synchronized void finishAllTasks() {
343  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
344  Map.Entry<String, RemoteTasks> entry = it.next();
345  RemoteTasks tasksForHost = entry.getValue();
346  tasksForHost.finishAllTasks();
347  it.remove();
348  }
349  }
350 
354  private final class RemoteTasks {
355 
356  private final long MAX_MINUTES_WITHOUT_UPDATE = HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS;
357  private Instant lastUpdateTime;
358  private Map<Long, ProgressHandle> taskIdsToProgressBars;
359 
370  lastUpdateTime = Instant.now();
371 
372  taskIdsToProgressBars = new HashMap<>();
373  event.getCurrentTasks().values().stream().forEach((task) -> {
374  ProgressHandle progress = ProgressHandleFactory.createHandle(event.getHostName());
375  progress.start();
376  progress.progress(task.getStatus());
377  taskIdsToProgressBars.put(task.getId(), progress);
378  });
379  }
380 
387  void update(CollaborationEvent event) {
391  lastUpdateTime = Instant.now();
392 
397  Map<Long, Task> remoteTasks = event.getCurrentTasks();
398  remoteTasks.values().stream().forEach((task) -> {
399  ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
400  if (null != progress) {
404  progress.progress(task.getStatus());
405  } else {
409  progress = ProgressHandleFactory.createHandle(event.getHostName());
410  progress.start();
411  progress.progress(task.getStatus());
412  taskIdsToProgressBars.put(task.getId(), progress);
413  }
414  });
415 
420  for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
421  Map.Entry<Long, ProgressHandle> entry = iterator.next();
422  if (!remoteTasks.containsKey(entry.getKey())) {
423  ProgressHandle progress = entry.getValue();
424  progress.finish();
425  iterator.remove();
426  }
427  }
428  }
429 
434  void finishAllTasks() {
435  taskIdsToProgressBars.values().stream().forEach((progress) -> {
436  progress.finish();
437  });
438  taskIdsToProgressBars.clear();
439  }
440 
448  boolean isStale() {
449  return Duration.between(lastUpdateTime, Instant.now()).toMinutes() >= MAX_MINUTES_WITHOUT_UPDATE;
450  }
451  }
452 
453  }
454 
462  private final class HeartbeatTask implements Runnable {
463 
467  @Override
468  public void run() {
469  try {
470  eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
471  } catch (Exception ex) {
472  logger.log(Level.SEVERE, "Unexpected exception in HeartbeatTask", ex); //NON-NLS
473  }
474  }
475  }
476 
482  private final class StaleTaskDetectionTask implements Runnable {
483 
487  @Override
488  public void run() {
489  try {
490  remoteTasksManager.finishStaleTasks();
491  } catch (Exception ex) {
492  logger.log(Level.SEVERE, "Unexpected exception in StaleTaskDetectionTask", ex); //NON-NLS
493  }
494  }
495  }
496 
501  private final static class CollaborationEvent extends AutopsyEvent implements Serializable {
502 
503  private static final long serialVersionUID = 1L;
504  private final String hostName;
505  private final Map<Long, Task> currentTasks;
506 
514  CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
515  super(COLLABORATION_MONITOR_EVENT, null, null);
516  this.hostName = hostName;
517  this.currentTasks = currentTasks;
518  }
519 
525  String getHostName() {
526  return hostName;
527  }
528 
535  Map<Long, Task> getCurrentTasks() {
536  return currentTasks;
537  }
538 
539  }
540 
544  private final static class Task implements Serializable {
545 
546  private static final long serialVersionUID = 1L;
547  private final long id;
548  private final String status;
549 
557  Task(long id, String status) {
558  this.id = id;
559  this.status = status;
560  }
561 
568  long getId() {
569  return id;
570  }
571 
577  String getStatus() {
578  return status;
579  }
580  }
581 
585  final static class CollaborationMonitorException extends Exception {
586 
593  CollaborationMonitorException(String message) {
594  super(message);
595  }
596 
604  CollaborationMonitorException(String message, Throwable throwable) {
605  super(message, throwable);
606  }
607  }
608 
609 }

Copyright © 2012-2015 Basis Technology. Generated on: Wed Apr 6 2016
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.