Autopsy 4.22.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
CollaborationMonitor.java
Go to the documentation of this file.
1/*
2 * Autopsy Forensic Browser
3 *
4 * Copyright 2015-2021 Basis Technology Corp.
5 * Contact: carrier <at> sleuthkit <dot> org
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19package org.sleuthkit.autopsy.casemodule;
20
21import com.google.common.util.concurrent.ThreadFactoryBuilder;
22import java.beans.PropertyChangeEvent;
23import java.beans.PropertyChangeListener;
24import java.io.Serializable;
25import java.time.Duration;
26import java.time.Instant;
27import java.util.Collections;
28import java.util.EnumSet;
29import java.util.HashMap;
30import java.util.Iterator;
31import java.util.Map;
32import java.util.Set;
33import java.util.UUID;
34import java.util.concurrent.ScheduledThreadPoolExecutor;
35import java.util.concurrent.TimeUnit;
36import java.util.logging.Level;
37import org.netbeans.api.progress.ProgressHandle;
38import org.openide.util.NbBundle;
39import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceEvent;
40import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceFailedEvent;
41import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent;
42import org.sleuthkit.autopsy.coreutils.Logger;
43import org.sleuthkit.autopsy.coreutils.NetworkUtils;
44import org.sleuthkit.autopsy.events.AutopsyEvent;
45import org.sleuthkit.autopsy.events.AutopsyEventException;
46import org.sleuthkit.autopsy.events.AutopsyEventPublisher;
47import org.sleuthkit.autopsy.ingest.IngestManager;
48import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent;
49import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
50import org.sleuthkit.datamodel.Content;
51
57final class CollaborationMonitor {
58
59 private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; //NON-NLS
60 private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; //NON-NLS
61 private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
62 Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
63 private static final Set<IngestManager.IngestJobEvent> INGEST_JOB_EVENTS_OF_INTEREST = EnumSet.of(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED, IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED);
64 private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
65 private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; //NON-NLS
66 private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
67 private static final long MAX_MISSED_HEARTBEATS = 5;
68 private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
69 private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
70 private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
71 private final String hostName;
72 private final LocalTasksManager localTasksManager;
73 private final RemoteTasksManager remoteTasksManager;
74 private final AutopsyEventPublisher eventPublisher;
75 private final ScheduledThreadPoolExecutor periodicTasksExecutor;
76
89 CollaborationMonitor(String eventChannelPrefix) throws CollaborationMonitorException {
94 hostName = NetworkUtils.getLocalHostName();
95
100 eventPublisher = new AutopsyEventPublisher();
101 try {
102 eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
103 } catch (AutopsyEventException ex) {
104 throw new CollaborationMonitorException("Failed to initialize", ex);
105 }
106
111 remoteTasksManager = new RemoteTasksManager();
112 eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
113
117 localTasksManager = new LocalTasksManager();
118 IngestManager.getInstance().addIngestJobEventListener(INGEST_JOB_EVENTS_OF_INTEREST, localTasksManager);
119 Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
120
127 periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
128 periodicTasksExecutor.scheduleWithFixedDelay(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
129 periodicTasksExecutor.scheduleWithFixedDelay(new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
130 }
131
135 void shutdown() {
136 if (null != periodicTasksExecutor) {
137 periodicTasksExecutor.shutdownNow();
138 try {
139 while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
140 logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
141 }
142 } catch (InterruptedException ex) {
143 logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
144 }
145 }
146
147 Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
148 IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
149
150 if (null != eventPublisher) {
151 eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
152 eventPublisher.closeRemoteEventChannel();
153 }
154
155 remoteTasksManager.shutdown();
156 }
157
165 private final class LocalTasksManager implements PropertyChangeListener {
166
167 private long nextTaskId;
168 private final Map<Integer, Task> eventIdsToAddDataSourceTasks;
169 private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
170
176 LocalTasksManager() {
177 nextTaskId = 0;
178 eventIdsToAddDataSourceTasks = new HashMap<>();
179 jobIdsTodataSourceAnalysisTasks = new HashMap<>();
180 }
181
188 @Override
189 public void propertyChange(PropertyChangeEvent event) {
190 if (AutopsyEvent.SourceType.LOCAL == ((AutopsyEvent) event).getSourceType()) {
191 String eventName = event.getPropertyName();
192 if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
193 addDataSourceAddTask((AddingDataSourceEvent) event);
194 } else if (eventName.equals(Case.Events.ADDING_DATA_SOURCE_FAILED.toString())) {
195 removeDataSourceAddTask(((AddingDataSourceFailedEvent) event).getAddingDataSourceEventId());
196 } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
197 removeDataSourceAddTask(((DataSourceAddedEvent) event).getAddingDataSourceEventId());
198 } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED.toString())) {
199 addDataSourceAnalysisTask((DataSourceAnalysisStartedEvent) event);
200 } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED.toString())) {
201 removeDataSourceAnalysisTask((DataSourceAnalysisCompletedEvent) event);
202 }
203 }
204 }
205
212 synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
213 String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.addingDataSourceStatus.msg", hostName);
214 eventIdsToAddDataSourceTasks.put(event.getEventId().hashCode(), new Task(++nextTaskId, status));
215 eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
216 }
217
225 synchronized void removeDataSourceAddTask(UUID eventId) {
226 eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
227 eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
228 }
229
236 synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
237 Content dataSource = event.getDataSource();
238 if (dataSource != null) {
239 String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, dataSource.getName());
240 jobIdsTodataSourceAnalysisTasks.put(event.getIngestJobId(), new Task(++nextTaskId, status));
241 eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
242 }
243 }
244
252 synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
253 jobIdsTodataSourceAnalysisTasks.remove(event.getIngestJobId());
254 eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
255 }
256
262 synchronized Map<Long, Task> getCurrentTasks() {
263 Map<Long, Task> currentTasks = new HashMap<>();
264 eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
265 currentTasks.put(task.getId(), task);
266 });
267 jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
268 currentTasks.put(task.getId(), task);
269 });
270 return currentTasks;
271 }
272 }
273
282 private final class RemoteTasksManager implements PropertyChangeListener {
283
284 private final Map<String, RemoteTasks> hostsToTasks;
285
291 RemoteTasksManager() {
292 hostsToTasks = new HashMap<>();
293 }
294
301 @Override
302 public void propertyChange(PropertyChangeEvent event) {
303 if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
304 updateTasks((CollaborationEvent) event);
305 }
306 }
307
311 synchronized void shutdown() {
312 finishAllTasks();
313 }
314
321 synchronized void updateTasks(CollaborationEvent event) {
322 RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
323 if (null != tasksForHost) {
324 tasksForHost.update(event);
325 } else {
326 hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
327 }
328 }
329
335 synchronized void finishStaleTasks() {
336 for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
337 Map.Entry<String, RemoteTasks> entry = it.next();
338 RemoteTasks tasksForHost = entry.getValue();
339 if (tasksForHost.isStale()) {
340 tasksForHost.finishAllTasks();
341 it.remove();
342 }
343 }
344 }
345
349 synchronized void finishAllTasks() {
350 for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
351 Map.Entry<String, RemoteTasks> entry = it.next();
352 RemoteTasks tasksForHost = entry.getValue();
353 tasksForHost.finishAllTasks();
354 it.remove();
355 }
356 }
357
361 private final class RemoteTasks {
362
363 private final long MAX_MINUTES_WITHOUT_UPDATE = HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS;
364 private Instant lastUpdateTime;
365 private Map<Long, ProgressHandle> taskIdsToProgressBars;
366
373 RemoteTasks(CollaborationEvent event) {
377 lastUpdateTime = Instant.now();
378
379 taskIdsToProgressBars = new HashMap<>();
380 event.getCurrentTasks().values().stream().forEach((task) -> {
381 ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
382 progress.start();
383 progress.progress(task.getStatus());
384 taskIdsToProgressBars.put(task.getId(), progress);
385 });
386 }
387
394 void update(CollaborationEvent event) {
398 lastUpdateTime = Instant.now();
399
404 Map<Long, Task> remoteTasks = event.getCurrentTasks();
405 remoteTasks.values().stream().forEach((task) -> {
406 ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
407 if (null != progress) {
411 progress.progress(task.getStatus());
412 } else {
416 progress = ProgressHandle.createHandle(event.getHostName());
417 progress.start();
418 progress.progress(task.getStatus());
419 taskIdsToProgressBars.put(task.getId(), progress);
420 }
421 });
422
427 for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
428 Map.Entry<Long, ProgressHandle> entry = iterator.next();
429 if (!remoteTasks.containsKey(entry.getKey())) {
430 ProgressHandle progress = entry.getValue();
431 progress.finish();
432 iterator.remove();
433 }
434 }
435 }
436
441 void finishAllTasks() {
442 taskIdsToProgressBars.values().stream().forEach((progress) -> {
443 progress.finish();
444 });
445 taskIdsToProgressBars.clear();
446 }
447
455 boolean isStale() {
456 return Duration.between(lastUpdateTime, Instant.now()).toMinutes() >= MAX_MINUTES_WITHOUT_UPDATE;
457 }
458 }
459
460 }
461
469 private final class HeartbeatTask implements Runnable {
470
474 @Override
475 public void run() {
476 try {
477 eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
478 } catch (Exception ex) {
479 logger.log(Level.SEVERE, "Unexpected exception in HeartbeatTask", ex); //NON-NLS
480 }
481 }
482 }
483
489 private final class StaleTaskDetectionTask implements Runnable {
490
494 @Override
495 public void run() {
496 try {
497 remoteTasksManager.finishStaleTasks();
498 } catch (Exception ex) {
499 logger.log(Level.SEVERE, "Unexpected exception in StaleTaskDetectionTask", ex); //NON-NLS
500 }
501 }
502 }
503
508 private final static class CollaborationEvent extends AutopsyEvent implements Serializable {
509
510 private static final long serialVersionUID = 1L;
511 private final String hostName;
512 private final Map<Long, Task> currentTasks;
513
521 CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
522 super(COLLABORATION_MONITOR_EVENT, null, null);
523 this.hostName = hostName;
524 this.currentTasks = currentTasks;
525 }
526
532 String getHostName() {
533 return hostName;
534 }
535
542 Map<Long, Task> getCurrentTasks() {
543 return Collections.unmodifiableMap(currentTasks);
544 }
545
546 }
547
551 private final static class Task implements Serializable {
552
553 private static final long serialVersionUID = 1L;
554 private final long id;
555 private final String status;
556
564 Task(long id, String status) {
565 this.id = id;
566 this.status = status;
567 }
568
575 long getId() {
576 return id;
577 }
578
584 String getStatus() {
585 return status;
586 }
587 }
588
592 final static class CollaborationMonitorException extends Exception {
593
600 CollaborationMonitorException(String message) {
601 super(message);
602 }
603
611 CollaborationMonitorException(String message, Throwable throwable) {
612 super(message, throwable);
613 }
614 }
615
616}
AutopsyEvent(String eventName, Object oldValue, Object newValue)

Copyright © 2012-2024 Sleuth Kit Labs. Generated on:
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.