19package org.sleuthkit.autopsy.casemodule;
21import com.google.common.util.concurrent.ThreadFactoryBuilder;
22import java.beans.PropertyChangeEvent;
23import java.beans.PropertyChangeListener;
24import java.io.Serializable;
25import java.time.Duration;
26import java.time.Instant;
27import java.util.Collections;
28import java.util.EnumSet;
29import java.util.HashMap;
30import java.util.Iterator;
34import java.util.concurrent.ScheduledThreadPoolExecutor;
35import java.util.concurrent.TimeUnit;
36import java.util.logging.Level;
37import org.netbeans.api.progress.ProgressHandle;
38import org.openide.util.NbBundle;
39import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceEvent;
40import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceFailedEvent;
41import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent;
42import org.sleuthkit.autopsy.coreutils.Logger;
43import org.sleuthkit.autopsy.coreutils.NetworkUtils;
44import org.sleuthkit.autopsy.events.AutopsyEvent;
45import org.sleuthkit.autopsy.events.AutopsyEventException;
46import org.sleuthkit.autopsy.events.AutopsyEventPublisher;
47import org.sleuthkit.autopsy.ingest.IngestManager;
48import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent;
49import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
50import org.sleuthkit.datamodel.Content;
57final class CollaborationMonitor {
59 private static final String EVENT_CHANNEL_NAME =
"%s-Collaboration-Monitor-Events";
60 private static final String COLLABORATION_MONITOR_EVENT =
"COLLABORATION_MONITOR_EVENT";
61 private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
62 Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
63 private static final Set<IngestManager.IngestJobEvent> INGEST_JOB_EVENTS_OF_INTEREST = EnumSet.of(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED, IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED);
64 private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
65 private static final String PERIODIC_TASK_THREAD_NAME =
"collab-monitor-periodic-tasks-%d";
66 private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
67 private static final long MAX_MISSED_HEARTBEATS = 5;
68 private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
69 private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
70 private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
71 private final String hostName;
74 private final AutopsyEventPublisher eventPublisher;
75 private final ScheduledThreadPoolExecutor periodicTasksExecutor;
89 CollaborationMonitor(String eventChannelPrefix)
throws CollaborationMonitorException {
94 hostName = NetworkUtils.getLocalHostName();
100 eventPublisher =
new AutopsyEventPublisher();
102 eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
103 }
catch (AutopsyEventException ex) {
104 throw new CollaborationMonitorException(
"Failed to initialize", ex);
112 eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
118 IngestManager.getInstance().addIngestJobEventListener(INGEST_JOB_EVENTS_OF_INTEREST, localTasksManager);
119 Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
127 periodicTasksExecutor =
new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS,
new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
128 periodicTasksExecutor.scheduleWithFixedDelay(
new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
129 periodicTasksExecutor.scheduleWithFixedDelay(
new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
136 if (
null != periodicTasksExecutor) {
137 periodicTasksExecutor.shutdownNow();
139 while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
140 logger.log(Level.WARNING,
"Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS);
142 }
catch (InterruptedException ex) {
143 logger.log(Level.SEVERE,
"Unexpected interrupt while stopping periodic tasks executor", ex);
147 Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
148 IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
150 if (
null != eventPublisher) {
151 eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
152 eventPublisher.closeRemoteEventChannel();
155 remoteTasksManager.shutdown();
165 private final class LocalTasksManager
implements PropertyChangeListener {
176 LocalTasksManager() {
191 String eventName =
event.getPropertyName();
213 String status = NbBundle.getMessage(CollaborationMonitor.class,
"CollaborationMonitor.addingDataSourceStatus.msg", hostName);
225 synchronized void removeDataSourceAddTask(UUID eventId) {
227 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
236 synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
237 Content dataSource =
event.getDataSource();
238 if (dataSource !=
null) {
239 String status = NbBundle.getMessage(CollaborationMonitor.class,
"CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, dataSource.getName());
241 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
252 synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
254 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
262 synchronized Map<Long, Task> getCurrentTasks() {
263 Map<Long, Task> currentTasks =
new HashMap<>();
265 currentTasks.put(task.getId(), task);
268 currentTasks.put(task.getId(), task);
282 private final class RemoteTasksManager
implements PropertyChangeListener {
291 RemoteTasksManager() {
303 if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
311 synchronized void shutdown() {
321 synchronized void updateTasks(CollaborationEvent event) {
322 RemoteTasks tasksForHost =
hostsToTasks.get(event.getHostName());
323 if (
null != tasksForHost) {
324 tasksForHost.update(event);
326 hostsToTasks.put(event.getHostName(),
new RemoteTasks(event));
335 synchronized void finishStaleTasks() {
336 for (Iterator<Map.Entry<String, RemoteTasks>> it =
hostsToTasks.entrySet().iterator(); it.hasNext();) {
337 Map.Entry<String, RemoteTasks> entry = it.next();
338 RemoteTasks tasksForHost = entry.getValue();
339 if (tasksForHost.isStale()) {
340 tasksForHost.finishAllTasks();
349 synchronized void finishAllTasks() {
353 tasksForHost.finishAllTasks();
361 private final class RemoteTasks {
380 event.getCurrentTasks().values().stream().forEach((task) -> {
381 ProgressHandle
progress = ProgressHandle.createHandle(event.getHostName());
383 progress.progress(task.getStatus());
404 Map<Long, Task> remoteTasks =
event.getCurrentTasks();
405 remoteTasks.values().stream().forEach((task) -> {
411 progress.progress(task.getStatus());
416 progress = ProgressHandle.createHandle(event.getHostName());
418 progress.progress(task.getStatus());
427 for (Iterator<Map.Entry<Long, ProgressHandle>> iterator =
taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
428 Map.Entry<Long, ProgressHandle> entry = iterator.next();
429 if (!remoteTasks.containsKey(entry.getKey())) {
430 ProgressHandle
progress = entry.getValue();
441 void finishAllTasks() {
477 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
478 }
catch (Exception ex) {
479 logger.log(Level.SEVERE,
"Unexpected exception in HeartbeatTask", ex);
497 remoteTasksManager.finishStaleTasks();
498 }
catch (Exception ex) {
499 logger.log(Level.SEVERE,
"Unexpected exception in StaleTaskDetectionTask", ex);
508 private final static class CollaborationEvent
extends AutopsyEvent implements Serializable {
522 super(COLLABORATION_MONITOR_EVENT,
null,
null);
532 String getHostName() {
542 Map<Long, Task> getCurrentTasks() {
551 private final static class Task
implements Serializable {
554 private final long id;
564 Task(
long id, String
status) {
592 final static class CollaborationMonitorException
extends Exception {
600 CollaborationMonitorException(String message) {
611 CollaborationMonitorException(String message, Throwable throwable) {
612 super(message, throwable);
final Map< Long, Task > currentTasks
static final long serialVersionUID
final Map< Integer, Task > eventIdsToAddDataSourceTasks
final Map< Long, Task > jobIdsTodataSourceAnalysisTasks
void propertyChange(PropertyChangeEvent event)
Map< Long, ProgressHandle > taskIdsToProgressBars
final long MAX_MINUTES_WITHOUT_UPDATE
void propertyChange(PropertyChangeEvent event)
final Map< String, RemoteTasks > hostsToTasks
static final long serialVersionUID
AutopsyEvent(String eventName, Object oldValue, Object newValue)
ADDING_DATA_SOURCE_FAILED
DATA_SOURCE_ANALYSIS_COMPLETED
DATA_SOURCE_ANALYSIS_STARTED