Autopsy  4.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
CollaborationMonitor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2015 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.casemodule;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Serializable;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.UUID;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import org.netbeans.api.progress.ProgressHandle;
38 import org.openide.util.NbBundle;
50 
56 final class CollaborationMonitor {
57 
58  private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; //NON-NLS
59  private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; //NON-NLS
60  private static final Set<String> CASE_EVENTS_OF_INTEREST = new HashSet<>(Arrays.asList(new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString(), Case.Events.ADDING_DATA_SOURCE_FAILED.toString()}));
61  private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
62  private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; //NON-NLS
63  private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
64  private static final long MAX_MISSED_HEARTBEATS = 5;
65  private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
66  private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
67  private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
68  private final String hostName;
69  private final LocalTasksManager localTasksManager;
70  private final RemoteTasksManager remoteTasksManager;
71  private final AutopsyEventPublisher eventPublisher;
72  private final ScheduledThreadPoolExecutor periodicTasksExecutor;
73 
81  CollaborationMonitor() throws CollaborationMonitorException {
86  hostName = NetworkUtils.getLocalHostName();
87 
92  eventPublisher = new AutopsyEventPublisher();
93  try {
94  Case openedCase = Case.getCurrentCase();
95  String channelPrefix = openedCase.getTextIndexName();
96  eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, channelPrefix));
97  } catch (AutopsyEventException ex) {
98  throw new CollaborationMonitorException("Failed to initialize", ex);
99  }
100 
105  remoteTasksManager = new RemoteTasksManager();
106  eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
107 
111  localTasksManager = new LocalTasksManager();
112  IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
113  Case.addEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
114 
121  periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
122  periodicTasksExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
123  periodicTasksExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
124  }
125 
129  void shutdown() {
130  if (null != periodicTasksExecutor) {
131  periodicTasksExecutor.shutdownNow();
132  try {
133  while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
134  logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
135  }
136  } catch (InterruptedException ex) {
137  logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
138  }
139  }
140 
141  Case.removeEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
142  IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
143 
144  if (null != eventPublisher) {
145  eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
146  eventPublisher.closeRemoteEventChannel();
147  }
148 
149  remoteTasksManager.shutdown();
150  }
151 
159  private final class LocalTasksManager implements PropertyChangeListener {
160 
161  private long nextTaskId;
162  private final Map<Integer, Task> eventIdsToAddDataSourceTasks;
163  private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
164 
171  nextTaskId = 0;
172  eventIdsToAddDataSourceTasks = new HashMap<>();
173  jobIdsTodataSourceAnalysisTasks = new HashMap<>();
174  }
175 
182  @Override
183  public void propertyChange(PropertyChangeEvent event) {
184  if (AutopsyEvent.SourceType.LOCAL == ((AutopsyEvent) event).getSourceType()) {
185  String eventName = event.getPropertyName();
186  if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
187  addDataSourceAddTask((AddingDataSourceEvent) event);
188  } else if (eventName.equals(Case.Events.ADDING_DATA_SOURCE_FAILED.toString())) {
189  removeDataSourceAddTask(((AddingDataSourceFailedEvent) event).getAddingDataSourceEventId());
190  } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
191  removeDataSourceAddTask(((DataSourceAddedEvent) event).getAddingDataSourceEventId());
192  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED.toString())) {
193  addDataSourceAnalysisTask((DataSourceAnalysisStartedEvent) event);
194  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED.toString())) {
195  removeDataSourceAnalysisTask((DataSourceAnalysisCompletedEvent) event);
196  }
197  }
198  }
199 
206  synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
207  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.addingDataSourceStatus.msg", hostName);
208  eventIdsToAddDataSourceTasks.put(event.getEventId().hashCode(), new Task(++nextTaskId, status));
209  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
210  }
211 
219  synchronized void removeDataSourceAddTask(UUID eventId) {
220  eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
221  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
222  }
223 
230  synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
231  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, event.getDataSource().getName());
232  jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status));
233  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
234  }
235 
243  synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
244  jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
245  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
246  }
247 
253  synchronized Map<Long, Task> getCurrentTasks() {
254  Map<Long, Task> currentTasks = new HashMap<>();
255  eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
256  currentTasks.put(task.getId(), task);
257  });
258  jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
259  currentTasks.put(task.getId(), task);
260  });
261  return currentTasks;
262  }
263  }
264 
273  private final class RemoteTasksManager implements PropertyChangeListener {
274 
275  private final Map<String, RemoteTasks> hostsToTasks;
276 
283  hostsToTasks = new HashMap<>();
284  }
285 
292  @Override
293  public void propertyChange(PropertyChangeEvent event) {
294  if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
295  updateTasks((CollaborationEvent) event);
296  }
297  }
298 
302  synchronized void shutdown() {
303  finishAllTasks();
304  }
305 
312  synchronized void updateTasks(CollaborationEvent event) {
313  RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
314  if (null != tasksForHost) {
315  tasksForHost.update(event);
316  } else {
317  hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
318  }
319  }
320 
326  synchronized void finishStaleTasks() {
327  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
328  Map.Entry<String, RemoteTasks> entry = it.next();
329  RemoteTasks tasksForHost = entry.getValue();
330  if (tasksForHost.isStale()) {
331  tasksForHost.finishAllTasks();
332  it.remove();
333  }
334  }
335  }
336 
340  synchronized void finishAllTasks() {
341  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
342  Map.Entry<String, RemoteTasks> entry = it.next();
343  RemoteTasks tasksForHost = entry.getValue();
344  tasksForHost.finishAllTasks();
345  it.remove();
346  }
347  }
348 
352  private final class RemoteTasks {
353 
354  private final long MAX_MINUTES_WITHOUT_UPDATE = HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS;
355  private Instant lastUpdateTime;
356  private Map<Long, ProgressHandle> taskIdsToProgressBars;
357 
368  lastUpdateTime = Instant.now();
369 
370  taskIdsToProgressBars = new HashMap<>();
371  event.getCurrentTasks().values().stream().forEach((task) -> {
372  ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
373  progress.start();
374  progress.progress(task.getStatus());
375  taskIdsToProgressBars.put(task.getId(), progress);
376  });
377  }
378 
385  void update(CollaborationEvent event) {
389  lastUpdateTime = Instant.now();
390 
395  Map<Long, Task> remoteTasks = event.getCurrentTasks();
396  remoteTasks.values().stream().forEach((task) -> {
397  ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
398  if (null != progress) {
402  progress.progress(task.getStatus());
403  } else {
407  progress = ProgressHandle.createHandle(event.getHostName());
408  progress.start();
409  progress.progress(task.getStatus());
410  taskIdsToProgressBars.put(task.getId(), progress);
411  }
412  });
413 
418  for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
419  Map.Entry<Long, ProgressHandle> entry = iterator.next();
420  if (!remoteTasks.containsKey(entry.getKey())) {
421  ProgressHandle progress = entry.getValue();
422  progress.finish();
423  iterator.remove();
424  }
425  }
426  }
427 
432  void finishAllTasks() {
433  taskIdsToProgressBars.values().stream().forEach((progress) -> {
434  progress.finish();
435  });
436  taskIdsToProgressBars.clear();
437  }
438 
446  boolean isStale() {
447  return Duration.between(lastUpdateTime, Instant.now()).toMinutes() >= MAX_MINUTES_WITHOUT_UPDATE;
448  }
449  }
450 
451  }
452 
460  private final class HeartbeatTask implements Runnable {
461 
465  @Override
466  public void run() {
467  try {
468  eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
469  } catch (Exception ex) {
470  logger.log(Level.SEVERE, "Unexpected exception in HeartbeatTask", ex); //NON-NLS
471  }
472  }
473  }
474 
480  private final class StaleTaskDetectionTask implements Runnable {
481 
485  @Override
486  public void run() {
487  try {
488  remoteTasksManager.finishStaleTasks();
489  } catch (Exception ex) {
490  logger.log(Level.SEVERE, "Unexpected exception in StaleTaskDetectionTask", ex); //NON-NLS
491  }
492  }
493  }
494 
499  private final static class CollaborationEvent extends AutopsyEvent implements Serializable {
500 
501  private static final long serialVersionUID = 1L;
502  private final String hostName;
503  private final Map<Long, Task> currentTasks;
504 
512  CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
513  super(COLLABORATION_MONITOR_EVENT, null, null);
514  this.hostName = hostName;
515  this.currentTasks = currentTasks;
516  }
517 
523  String getHostName() {
524  return hostName;
525  }
526 
533  Map<Long, Task> getCurrentTasks() {
534  return currentTasks;
535  }
536 
537  }
538 
542  private final static class Task implements Serializable {
543 
544  private static final long serialVersionUID = 1L;
545  private final long id;
546  private final String status;
547 
555  Task(long id, String status) {
556  this.id = id;
557  this.status = status;
558  }
559 
566  long getId() {
567  return id;
568  }
569 
575  String getStatus() {
576  return status;
577  }
578  }
579 
583  final static class CollaborationMonitorException extends Exception {
584 
591  CollaborationMonitorException(String message) {
592  super(message);
593  }
594 
602  CollaborationMonitorException(String message, Throwable throwable) {
603  super(message, throwable);
604  }
605  }
606 
607 }

Copyright © 2012-2016 Basis Technology. Generated on: Tue Oct 25 2016
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.