19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.logging.Level;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import javax.annotation.concurrent.GuardedBy;
44 import javax.annotation.concurrent.Immutable;
45 import javax.annotation.concurrent.ThreadSafe;
46 import javax.swing.JOptionPane;
47 import org.netbeans.api.progress.ProgressHandle;
48 import org.openide.util.Cancellable;
49 import org.openide.util.NbBundle;
115 @GuardedBy(
"IngestManager.class")
119 private final ExecutorService
startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build());
124 private final ExecutorService
eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build());
143 if (null == instance) {
145 instance.subscribeToServiceMonitorEvents();
146 instance.subscribeToCaseEvents();
184 PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
194 }
catch (IllegalStateException noCaseOpenException) {
199 LOGGER.log(Level.SEVERE,
"Service {0} is down, cancelling all running ingest jobs", serviceDisplayName);
201 EventQueue.invokeLater(
new Runnable() {
204 JOptionPane.showMessageDialog(null,
205 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
206 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
207 JOptionPane.ERROR_MESSAGE);
221 Set<String> servicesList =
new HashSet<>();
233 if (event.getNewValue() != null) {
248 void handleCaseOpened() {
253 String channelPrefix = openedCase.
getName();
258 }
catch (IllegalStateException | AutopsyEventException ex) {
259 LOGGER.log(Level.SEVERE,
"Failed to open remote events channel", ex);
260 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
261 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
273 void handleCaseClosed() {
304 if (job.hasIngestPipeline()) {
324 if (job.hasIngestPipeline()) {
341 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
342 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
343 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
344 "IngestManager.startupErr.dlgErrorList=Errors:"
347 List<IngestModuleError> errors = null;
353 EventQueue.invokeLater(
new Runnable() {
357 JOptionPane.showMessageDialog(null,
358 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
359 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
360 JOptionPane.ERROR_MESSAGE);
376 errors = job.start();
377 if (errors.isEmpty()) {
378 this.fireIngestJobStarted(job.
getId());
383 LOGGER.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable());
387 final StringBuilder message =
new StringBuilder(1024);
388 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
389 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
390 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
392 String moduleName = error.getModuleDisplayName();
393 String errorMessage = error.getThrowable().getLocalizedMessage();
394 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
396 message.append(
"\n\n");
397 EventQueue.invokeLater(() -> {
398 JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
414 long jobId = job.
getId();
418 fireIngestJobCompleted(jobId);
420 IngestManager.LOGGER.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
421 fireIngestJobCancelled(jobId);
490 void fireIngestJobStarted(
long ingestJobId) {
500 void fireIngestJobCompleted(
long ingestJobId) {
501 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
510 void fireIngestJobCancelled(
long ingestJobId) {
511 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
523 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
524 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
536 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
537 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
549 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
550 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
560 void fireFileIngestDone(AbstractFile file) {
561 AutopsyEvent
event =
new FileAnalyzedEvent(file);
572 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
573 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
584 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
585 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
594 void initIngestMessageInbox() {
605 void postIngestMessage(IngestMessage message) {
608 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
612 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
614 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
615 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
616 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
617 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
618 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
649 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
650 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
664 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
666 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
668 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
678 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
689 void setIngestTaskProgressCompleted(FileIngestTask task) {
691 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
693 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
703 if (moduleDisplayName.equals(
"IDLE")) {
710 if (prevTimeL != null) {
711 prevTime = prevTimeL;
713 prevTime += duration;
723 Map<String, Long> getModuleRunTimes() {
736 List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
745 List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
746 List<DataSourceIngestJob.Snapshot> snapShots =
new ArrayList<>();
748 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
759 long getFreeDiskSpace() {
784 if (Thread.currentThread().isInterrupted()) {
790 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
791 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
793 public boolean cancel() {
794 if (progress != null) {
795 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
809 if (null != progress) {
824 private final IngestTaskQueue
tasks;
835 IngestTask task = tasks.getNextTask();
836 task.execute(threadId);
837 }
catch (InterruptedException ex) {
840 if (Thread.currentThread().isInterrupted()) {
879 static final class IngestThreadActivitySnapshot {
881 private final long threadId;
882 private final Date startTime;
883 private final String activity;
884 private final String dataSourceName;
885 private final String fileName;
886 private final long jobId;
895 IngestThreadActivitySnapshot(
long threadId) {
896 this.threadId = threadId;
897 startTime =
new Date();
898 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
899 this.dataSourceName =
"";
914 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource) {
915 this.threadId = threadId;
917 startTime =
new Date();
918 this.activity = activity;
919 this.dataSourceName = dataSource.getName();
935 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
936 this.threadId = threadId;
938 startTime =
new Date();
939 this.activity = activity;
940 this.dataSourceName = dataSource.getName();
941 this.fileName = file.getName();
949 long getIngestJobId() {
967 Date getStartTime() {
976 String getActivity() {
987 String getDataSourceName() {
988 return dataSourceName;
996 String getFileName() {
1097 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
final Map< Long, Future< Void > > startIngestJobFutures
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final String INGEST_MODULE_EVENT_CHANNEL_NAME
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
IngestJobStartResult startIngestJob(IngestJob job)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static boolean runningWithGUI
void cancelAllIngestJobs()
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final Logger LOGGER
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
final AutopsyEventPublisher publisher
synchronized void closeRemoteEventChannel()
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
static final String INGEST_JOB_EVENT_CHANNEL_NAME
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
static int numberOfFileIngestThreads()
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
IngestManagerException(String message)
final IngestTaskQueue tasks
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
volatile boolean caseIsOpen
final AtomicLong nextIngestManagerTaskId
int getNumberOfFileIngestThreads()
void addIngestModuleEventListener(final PropertyChangeListener listener)
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
void incrementModuleRunTime(String moduleDisplayName, Long duration)
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final IngestMonitor ingestMonitor
final ExecutorService fileLevelIngestJobTasksExecutor
static final long serialVersionUID
final Map< Long, IngestJob > ingestJobsById
void subscribeToCaseEvents()
final ExecutorService startIngestJobsExecutor
final int numberOfFileIngestThreads
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
final AutopsyEventPublisher jobEventPublisher