Autopsy  4.17.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
CollaborationMonitor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2019 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.casemodule;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Serializable;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.Collections;
28 import java.util.EnumSet;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.UUID;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import org.netbeans.api.progress.ProgressHandle;
38 import org.openide.util.NbBundle;
50 import org.sleuthkit.datamodel.Content;
51 
57 final class CollaborationMonitor {
58 
59  private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; //NON-NLS
60  private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; //NON-NLS
61  private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
62  Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
63  private static final Set<IngestManager.IngestJobEvent> INGEST_JOB_EVENTS_OF_INTEREST = EnumSet.of(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED, IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED);
64  private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
65  private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; //NON-NLS
66  private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
67  private static final long MAX_MISSED_HEARTBEATS = 5;
68  private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
69  private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
70  private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
71  private final String hostName;
72  private final LocalTasksManager localTasksManager;
73  private final RemoteTasksManager remoteTasksManager;
74  private final AutopsyEventPublisher eventPublisher;
75  private final ScheduledThreadPoolExecutor periodicTasksExecutor;
76 
89  CollaborationMonitor(String eventChannelPrefix) throws CollaborationMonitorException {
94  hostName = NetworkUtils.getLocalHostName();
95 
100  eventPublisher = new AutopsyEventPublisher();
101  try {
102  eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
103  } catch (AutopsyEventException ex) {
104  throw new CollaborationMonitorException("Failed to initialize", ex);
105  }
106 
111  remoteTasksManager = new RemoteTasksManager();
112  eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
113 
117  localTasksManager = new LocalTasksManager();
118  IngestManager.getInstance().addIngestJobEventListener(INGEST_JOB_EVENTS_OF_INTEREST, localTasksManager);
119  Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
120 
127  periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
128  periodicTasksExecutor.scheduleWithFixedDelay(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
129  periodicTasksExecutor.scheduleWithFixedDelay(new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
130  }
131 
135  void shutdown() {
136  if (null != periodicTasksExecutor) {
137  periodicTasksExecutor.shutdownNow();
138  try {
139  while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
140  logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
141  }
142  } catch (InterruptedException ex) {
143  logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
144  }
145  }
146 
147  Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
148  IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
149 
150  if (null != eventPublisher) {
151  eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
152  eventPublisher.closeRemoteEventChannel();
153  }
154 
155  remoteTasksManager.shutdown();
156  }
157 
165  private final class LocalTasksManager implements PropertyChangeListener {
166 
167  private long nextTaskId;
168  private final Map<Integer, Task> eventIdsToAddDataSourceTasks;
169  private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
170 
177  nextTaskId = 0;
178  eventIdsToAddDataSourceTasks = new HashMap<>();
179  jobIdsTodataSourceAnalysisTasks = new HashMap<>();
180  }
181 
188  @Override
189  public void propertyChange(PropertyChangeEvent event) {
190  if (AutopsyEvent.SourceType.LOCAL == ((AutopsyEvent) event).getSourceType()) {
191  String eventName = event.getPropertyName();
192  if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
193  addDataSourceAddTask((AddingDataSourceEvent) event);
194  } else if (eventName.equals(Case.Events.ADDING_DATA_SOURCE_FAILED.toString())) {
195  removeDataSourceAddTask(((AddingDataSourceFailedEvent) event).getAddingDataSourceEventId());
196  } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
197  removeDataSourceAddTask(((DataSourceAddedEvent) event).getAddingDataSourceEventId());
198  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED.toString())) {
199  addDataSourceAnalysisTask((DataSourceAnalysisStartedEvent) event);
200  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED.toString())) {
201  removeDataSourceAnalysisTask((DataSourceAnalysisCompletedEvent) event);
202  }
203  }
204  }
205 
212  synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
213  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.addingDataSourceStatus.msg", hostName);
214  eventIdsToAddDataSourceTasks.put(event.getEventId().hashCode(), new Task(++nextTaskId, status));
215  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
216  }
217 
225  synchronized void removeDataSourceAddTask(UUID eventId) {
226  eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
227  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
228  }
229 
236  synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
237  Content dataSource = event.getDataSource();
238  if (dataSource != null) {
239  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, dataSource.getName());
240  jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status));
241  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
242  }
243  }
244 
252  synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
253  jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
254  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
255  }
256 
262  synchronized Map<Long, Task> getCurrentTasks() {
263  Map<Long, Task> currentTasks = new HashMap<>();
264  eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
265  currentTasks.put(task.getId(), task);
266  });
267  jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
268  currentTasks.put(task.getId(), task);
269  });
270  return currentTasks;
271  }
272  }
273 
282  private final class RemoteTasksManager implements PropertyChangeListener {
283 
284  private final Map<String, RemoteTasks> hostsToTasks;
285 
292  hostsToTasks = new HashMap<>();
293  }
294 
301  @Override
302  public void propertyChange(PropertyChangeEvent event) {
303  if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
304  updateTasks((CollaborationEvent) event);
305  }
306  }
307 
311  synchronized void shutdown() {
312  finishAllTasks();
313  }
314 
321  synchronized void updateTasks(CollaborationEvent event) {
322  RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
323  if (null != tasksForHost) {
324  tasksForHost.update(event);
325  } else {
326  hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
327  }
328  }
329 
335  synchronized void finishStaleTasks() {
336  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
337  Map.Entry<String, RemoteTasks> entry = it.next();
338  RemoteTasks tasksForHost = entry.getValue();
339  if (tasksForHost.isStale()) {
340  tasksForHost.finishAllTasks();
341  it.remove();
342  }
343  }
344  }
345 
349  synchronized void finishAllTasks() {
350  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
351  Map.Entry<String, RemoteTasks> entry = it.next();
352  RemoteTasks tasksForHost = entry.getValue();
353  tasksForHost.finishAllTasks();
354  it.remove();
355  }
356  }
357 
361  private final class RemoteTasks {
362 
363  private final long MAX_MINUTES_WITHOUT_UPDATE = HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS;
364  private Instant lastUpdateTime;
365  private Map<Long, ProgressHandle> taskIdsToProgressBars;
366 
377  lastUpdateTime = Instant.now();
378 
379  taskIdsToProgressBars = new HashMap<>();
380  event.getCurrentTasks().values().stream().forEach((task) -> {
381  ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
382  progress.start();
383  progress.progress(task.getStatus());
384  taskIdsToProgressBars.put(task.getId(), progress);
385  });
386  }
387 
394  void update(CollaborationEvent event) {
398  lastUpdateTime = Instant.now();
399 
404  Map<Long, Task> remoteTasks = event.getCurrentTasks();
405  remoteTasks.values().stream().forEach((task) -> {
406  ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
407  if (null != progress) {
411  progress.progress(task.getStatus());
412  } else {
416  progress = ProgressHandle.createHandle(event.getHostName());
417  progress.start();
418  progress.progress(task.getStatus());
419  taskIdsToProgressBars.put(task.getId(), progress);
420  }
421  });
422 
427  for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
428  Map.Entry<Long, ProgressHandle> entry = iterator.next();
429  if (!remoteTasks.containsKey(entry.getKey())) {
430  ProgressHandle progress = entry.getValue();
431  progress.finish();
432  iterator.remove();
433  }
434  }
435  }
436 
441  void finishAllTasks() {
442  taskIdsToProgressBars.values().stream().forEach((progress) -> {
443  progress.finish();
444  });
445  taskIdsToProgressBars.clear();
446  }
447 
455  boolean isStale() {
456  return Duration.between(lastUpdateTime, Instant.now()).toMinutes() >= MAX_MINUTES_WITHOUT_UPDATE;
457  }
458  }
459 
460  }
461 
469  private final class HeartbeatTask implements Runnable {
470 
474  @Override
475  public void run() {
476  try {
477  eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
478  } catch (Exception ex) {
479  logger.log(Level.SEVERE, "Unexpected exception in HeartbeatTask", ex); //NON-NLS
480  }
481  }
482  }
483 
489  private final class StaleTaskDetectionTask implements Runnable {
490 
494  @Override
495  public void run() {
496  try {
497  remoteTasksManager.finishStaleTasks();
498  } catch (Exception ex) {
499  logger.log(Level.SEVERE, "Unexpected exception in StaleTaskDetectionTask", ex); //NON-NLS
500  }
501  }
502  }
503 
508  private final static class CollaborationEvent extends AutopsyEvent implements Serializable {
509 
510  private static final long serialVersionUID = 1L;
511  private final String hostName;
512  private final Map<Long, Task> currentTasks;
513 
521  CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
522  super(COLLABORATION_MONITOR_EVENT, null, null);
523  this.hostName = hostName;
524  this.currentTasks = currentTasks;
525  }
526 
532  String getHostName() {
533  return hostName;
534  }
535 
542  Map<Long, Task> getCurrentTasks() {
543  return Collections.unmodifiableMap(currentTasks);
544  }
545 
546  }
547 
551  private final static class Task implements Serializable {
552 
553  private static final long serialVersionUID = 1L;
554  private final long id;
555  private final String status;
556 
564  Task(long id, String status) {
565  this.id = id;
566  this.status = status;
567  }
568 
575  long getId() {
576  return id;
577  }
578 
584  String getStatus() {
585  return status;
586  }
587  }
588 
592  final static class CollaborationMonitorException extends Exception {
593 
600  CollaborationMonitorException(String message) {
601  super(message);
602  }
603 
611  CollaborationMonitorException(String message, Throwable throwable) {
612  super(message, throwable);
613  }
614  }
615 
616 }

Copyright © 2012-2021 Basis Technology. Generated on: Tue Jan 19 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.