19 package org.sleuthkit.autopsy.casemodule;
 
   21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
   22 import java.beans.PropertyChangeEvent;
 
   23 import java.beans.PropertyChangeListener;
 
   24 import java.io.Serializable;
 
   25 import java.time.Duration;
 
   26 import java.time.Instant;
 
   27 import java.util.EnumSet;
 
   28 import java.util.HashMap;
 
   29 import java.util.Iterator;
 
   32 import java.util.UUID;
 
   33 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
   34 import java.util.concurrent.TimeUnit;
 
   35 import java.util.logging.Level;
 
   36 import org.netbeans.api.progress.ProgressHandle;
 
   37 import org.openide.util.NbBundle;
 
   56 final class CollaborationMonitor {
 
   58     private static final String EVENT_CHANNEL_NAME = 
"%s-Collaboration-Monitor-Events"; 
 
   59     private static final String COLLABORATION_MONITOR_EVENT = 
"COLLABORATION_MONITOR_EVENT"; 
 
   60     private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
 
   61             Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
 
   62     private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
 
   63     private static final String PERIODIC_TASK_THREAD_NAME = 
"collab-monitor-periodic-tasks-%d"; 
 
   64     private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
 
   65     private static final long MAX_MISSED_HEARTBEATS = 5;
 
   66     private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
 
   67     private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
 
   68     private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
 
   69     private final String hostName;
 
   70     private final LocalTasksManager localTasksManager;
 
   71     private final RemoteTasksManager remoteTasksManager;
 
   72     private final AutopsyEventPublisher eventPublisher;
 
   73     private final ScheduledThreadPoolExecutor periodicTasksExecutor;
 
   87     CollaborationMonitor(String eventChannelPrefix) 
throws CollaborationMonitorException {
 
   92         hostName = NetworkUtils.getLocalHostName();
 
   98         eventPublisher = 
new AutopsyEventPublisher();
 
  100             eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
 
  101         } 
catch (AutopsyEventException ex) {
 
  102             throw new CollaborationMonitorException(
"Failed to initialize", ex);
 
  109         remoteTasksManager = 
new RemoteTasksManager();
 
  110         eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
 
  115         localTasksManager = 
new LocalTasksManager();
 
  116         IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
 
  117         Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
 
  125         periodicTasksExecutor = 
new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, 
new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
 
  126         periodicTasksExecutor.scheduleWithFixedDelay(
new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
 
  127         periodicTasksExecutor.scheduleWithFixedDelay(
new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
 
  134         if (null != periodicTasksExecutor) {
 
  135             periodicTasksExecutor.shutdownNow();
 
  137                 while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
 
  138                     logger.log(Level.WARNING, 
"Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); 
 
  140             } 
catch (InterruptedException ex) {
 
  141                 logger.log(Level.SEVERE, 
"Unexpected interrupt while stopping periodic tasks executor", ex); 
 
  145         Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
 
  146         IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
 
  148         if (null != eventPublisher) {
 
  149             eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
 
  150             eventPublisher.closeRemoteEventChannel();
 
  153         remoteTasksManager.shutdown();
 
  176             eventIdsToAddDataSourceTasks = 
new HashMap<>();
 
  177             jobIdsTodataSourceAnalysisTasks = 
new HashMap<>();
 
  189                 String eventName = 
event.getPropertyName();
 
  211             String status = NbBundle.getMessage(CollaborationMonitor.class, 
"CollaborationMonitor.addingDataSourceStatus.msg", hostName);
 
  212             eventIdsToAddDataSourceTasks.put(event.
getEventId().hashCode(), 
new Task(++nextTaskId, status));
 
  223         synchronized void removeDataSourceAddTask(UUID eventId) {
 
  224             eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
 
  225             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  234         synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
 
  235             Content dataSource = 
event.getDataSource();
 
  236             if (dataSource != null) {
 
  237                 String status = NbBundle.getMessage(CollaborationMonitor.class, 
"CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, dataSource.getName());
 
  238                 jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), 
new Task(++nextTaskId, status));
 
  239                 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  250         synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
 
  251             jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
 
  252             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  260         synchronized Map<Long, Task> getCurrentTasks() {
 
  261             Map<Long, Task> currentTasks = 
new HashMap<>();
 
  262             eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
 
  263                 currentTasks.put(task.getId(), task);
 
  265             jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
 
  266                 currentTasks.put(task.getId(), task);
 
  290             hostsToTasks = 
new HashMap<>();
 
  301             if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
 
  309         synchronized void shutdown() {
 
  319         synchronized void updateTasks(CollaborationEvent event) {
 
  320             RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
 
  321             if (null != tasksForHost) {
 
  322                 tasksForHost.update(event);
 
  324                 hostsToTasks.put(event.getHostName(), 
new RemoteTasks(event));
 
  333         synchronized void finishStaleTasks() {
 
  334             for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
 
  335                 Map.Entry<String, RemoteTasks> entry = it.next();
 
  336                 RemoteTasks tasksForHost = entry.getValue();
 
  337                 if (tasksForHost.isStale()) {
 
  338                     tasksForHost.finishAllTasks();
 
  347         synchronized void finishAllTasks() {
 
  348             for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
 
  349                 Map.Entry<String, RemoteTasks> entry = it.next();
 
  350                 RemoteTasks tasksForHost = entry.getValue();
 
  351                 tasksForHost.finishAllTasks();
 
  375                 lastUpdateTime = Instant.now();
 
  377                 taskIdsToProgressBars = 
new HashMap<>();
 
  378                 event.getCurrentTasks().values().stream().forEach((task) -> {
 
  379                     ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
 
  381                     progress.progress(task.getStatus());
 
  382                     taskIdsToProgressBars.put(task.getId(), progress);
 
  396                 lastUpdateTime = Instant.now();
 
  402                 Map<Long, Task> remoteTasks = 
event.getCurrentTasks();
 
  403                 remoteTasks.values().stream().forEach((task) -> {
 
  404                     ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
 
  405                     if (null != progress) {
 
  409                         progress.progress(task.getStatus());
 
  414                         progress = ProgressHandle.createHandle(event.getHostName());
 
  416                         progress.progress(task.getStatus());
 
  417                         taskIdsToProgressBars.put(task.getId(), progress);
 
  425                 for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
 
  426                     Map.Entry<Long, ProgressHandle> entry = iterator.next();
 
  427                     if (!remoteTasks.containsKey(entry.getKey())) {
 
  428                         ProgressHandle progress = entry.getValue();
 
  439             void finishAllTasks() {
 
  440                 taskIdsToProgressBars.values().stream().forEach((progress) -> {
 
  443                 taskIdsToProgressBars.clear();
 
  475                 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
 
  476             } 
catch (Exception ex) {
 
  477                 logger.log(Level.SEVERE, 
"Unexpected exception in HeartbeatTask", ex); 
 
  495                 remoteTasksManager.finishStaleTasks();
 
  496             } 
catch (Exception ex) {
 
  497                 logger.log(Level.SEVERE, 
"Unexpected exception in StaleTaskDetectionTask", ex); 
 
  520             super(COLLABORATION_MONITOR_EVENT, null, null);
 
  530         String getHostName() {
 
  540         Map<Long, Task> getCurrentTasks() {
 
  549     private final static class Task implements Serializable {
 
  552         private final long id;
 
  562         Task(
long id, String status) {
 
  590     final static class CollaborationMonitorException 
extends Exception {
 
  598         CollaborationMonitorException(String message) {
 
  609         CollaborationMonitorException(String message, Throwable throwable) {
 
  610             super(message, throwable);
 
final Map< String, RemoteTasks > hostsToTasks
ADDING_DATA_SOURCE_FAILED
final Map< Integer, Task > eventIdsToAddDataSourceTasks
static final long serialVersionUID
DATA_SOURCE_ANALYSIS_COMPLETED
void propertyChange(PropertyChangeEvent event)
Map< Long, ProgressHandle > taskIdsToProgressBars
final Map< Long, Task > jobIdsTodataSourceAnalysisTasks
void propertyChange(PropertyChangeEvent event)
static final long serialVersionUID
final long MAX_MINUTES_WITHOUT_UPDATE
DATA_SOURCE_ANALYSIS_STARTED
final Map< Long, Task > currentTasks