Autopsy  4.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2013-2015 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.logging.Level;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import javax.swing.JOptionPane;
44 import org.netbeans.api.progress.ProgressHandle;
45 import org.openide.util.Cancellable;
46 import org.openide.util.NbBundle;
61 import org.sleuthkit.datamodel.AbstractFile;
62 import org.sleuthkit.datamodel.Content;
63 
68 public class IngestManager {
69 
70  private static final Logger logger = Logger.getLogger(IngestManager.class.getName());
71  private static IngestManager instance;
72  private final Object ingestMessageBoxLock = new Object();
73 
74  /*
75  * The ingest manager maintains a mapping of ingest job ids to running
76  * ingest jobs.
77  */
78  private final Map<Long, IngestJob> jobsById;
79 
80  /*
81  * Each runnable/callable task the ingest manager submits to its thread
82  * pools is given a unique thread/task ID.
83  */
84  private final AtomicLong nextThreadId;
85 
86  /*
87  * Ingest jobs may be queued to be started on a pool thread by start ingest
88  * job tasks. A mapping of task ids to the Future objects for each task is
89  * maintained to allow for task cancellation.
90  */
91  private final Map<Long, Future<Void>> startIngestJobTasks;
92  private final ExecutorService startIngestJobsThreadPool;
93 
94  /*
95  * Ingest jobs use an ingest task scheduler to break themselves down into
96  * data source level and file level tasks. The ingest scheduler puts these
97  * ingest tasks into queues for execution on ingest manager pool threads by
98  * ingest task executers. There is a single data source level ingest thread
99  * and a user configurable number of file level ingest threads.
100  */
101  private final ExecutorService dataSourceIngestThreadPool;
102  private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1;
103  private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16;
104  private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2;
106  private final ExecutorService fileIngestThreadPool;
107 
108  private static final String JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
109  private static final String MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
110  private static final Set<String> jobEventNames = Stream.of(IngestJobEvent.values())
111  .map(IngestJobEvent::toString)
112  .collect(Collectors.toSet());
113  private static final Set<String> moduleEventNames = Stream.of(IngestModuleEvent.values())
114  .map(IngestModuleEvent::toString)
115  .collect(Collectors.toSet());
118  private final ExecutorService eventPublishingExecutor;
119 
120  /*
121  * The ingest manager uses an ingest monitor to determine when system
122  * resources are under pressure. If the monitor detects such a situation, it
123  * calls back to the ingest manager to cancel all ingest jobs in progress.
124  */
126 
127  /*
128  * The ingest manager provides access to a top component that is used by
129  * ingest module to post messages for the user. A count of the posts is used
130  * as a cap to avoid bogging down the application.
131  */
132  private static final int MAX_ERROR_MESSAGE_POSTS = 200;
133  private volatile IngestMessageTopComponent ingestMessageBox;
134  private final AtomicLong ingestErrorMessagePosts;
135 
136  /*
137  * The ingest manager supports reporting of ingest processing progress by
138  * collecting snapshots of the activities of the ingest threads, ingest job
139  * progress, and ingest module run times.
140  */
141  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots;
142  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes;
143 
144  /*
145  * The ingest job creation capability of the ingest manager can be turned on
146  * and off to support an orderly shut down of the application.
147  */
148  private volatile boolean jobCreationIsEnabled;
149 
150  /*
151  * Ingest manager subscribes to service outage notifications. If key
152  * services are down, ingest manager cancels all ingest jobs in progress.
153  */
155 
159  public enum IngestJobEvent {
160 
197  };
198 
202  public enum IngestModuleEvent {
203 
226  };
227 
234  public synchronized static IngestManager getInstance() {
235  if (instance == null) {
241  instance = new IngestManager();
242  instance.subscribeToCaseEvents();
243  }
244  return instance;
245  }
246 
255  private IngestManager() {
256  this.ingestModuleRunTimes = new ConcurrentHashMap<>();
257  this.ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
258  this.ingestErrorMessagePosts = new AtomicLong(0L);
259  this.ingestMonitor = new IngestMonitor();
260  this.eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS
261  this.jobEventPublisher = new AutopsyEventPublisher();
262  this.moduleEventPublisher = new AutopsyEventPublisher();
263  this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS
264  this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS
265  this.nextThreadId = new AtomicLong(0L);
266  this.jobsById = new HashMap<>();
267  this.startIngestJobTasks = new ConcurrentHashMap<>();
268 
269  this.servicesMonitor = ServicesMonitor.getInstance();
271 
273 
274  numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
275  if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
276  numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS;
277  UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads);
278  }
279  fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
280  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
282  }
283  }
284 
290  long threadId = nextThreadId.incrementAndGet();
291  dataSourceIngestThreadPool.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
292  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
293  }
294 
299  private void startFileIngestThread() {
300  long threadId = nextThreadId.incrementAndGet();
301  fileIngestThreadPool.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
302  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
303  }
304 
308  private void subscribeToCaseEvents() {
309  Case.addEventSubscriber(Case.Events.CURRENT_CASE.toString(), new PropertyChangeListener() {
310  @Override
311  public void propertyChange(PropertyChangeEvent event) {
312  if (event.getNewValue() != null) {
313  handleCaseOpened();
314  } else {
315  handleCaseClosed();
316  }
317  }
318  });
319  }
320 
326  PropertyChangeListener propChangeListener = new PropertyChangeListener() {
327  @Override
328  public void propertyChange(PropertyChangeEvent evt) {
329  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
330 
331  // check whether a multi-user case is currently being processed
332  try {
334  return;
335  }
336  } catch (IllegalStateException ignore) {
337  // thorown by Case.getCurrentCase() when no case is open
338  return;
339  }
340 
341  // one of the services we subscribed to went down
342  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
343  logger.log(Level.SEVERE, "Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
344 
345  // display notification if running interactively
347  EventQueue.invokeLater(new Runnable() {
348  @Override
349  public void run() {
350  JOptionPane.showMessageDialog(null,
351  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
352  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
353  JOptionPane.ERROR_MESSAGE);
354  }
355  });
356  }
357 
358  // cancel ingest if running
360  }
361  }
362  };
363 
364  // subscribe to services of interest
365  Set<String> servicesList = new HashSet<>();
366  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
367  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
368  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
369  }
370 
371  synchronized void handleCaseOpened() {
372  this.jobCreationIsEnabled = true;
374  try {
381  Case openedCase = Case.getCurrentCase();
382  String channelPrefix = openedCase.getTextIndexName();
383  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
384  jobEventPublisher.openRemoteEventChannel(String.format(JOB_EVENT_CHANNEL_NAME, channelPrefix));
385  moduleEventPublisher.openRemoteEventChannel(String.format(MODULE_EVENT_CHANNEL_NAME, channelPrefix));
386  }
387  } catch (IllegalStateException | AutopsyEventException ex) {
388  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
389  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
390  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
391  }
392  }
393 
394  synchronized void handleCaseClosed() {
395  jobEventPublisher.closeRemoteEventChannel();
396  moduleEventPublisher.closeRemoteEventChannel();
397  this.jobCreationIsEnabled = false;
399  }
400 
408  @Deprecated
409  public synchronized void setRunInteractively(boolean runInteractively) {
410  RuntimeProperties.setCoreComponentsActive(runInteractively);
411  }
412 
418  void initIngestMessageInbox() {
419  synchronized (this.ingestMessageBoxLock) {
420  ingestMessageBox = IngestMessageTopComponent.findInstance();
421  }
422  }
423 
429  void postIngestMessage(IngestMessage message) {
430  synchronized (this.ingestMessageBoxLock) {
431  if (ingestMessageBox != null && RuntimeProperties.coreComponentsAreActive()) {
432  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
433  ingestMessageBox.displayMessage(message);
434  } else {
435  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
436  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
437  ingestMessageBox.displayMessage(message);
438  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
439  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
440  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
441  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
442  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
443  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
444  }
445  }
446  }
447  }
448  }
449 
450  private void clearIngestMessageBox() {
451  synchronized (this.ingestMessageBoxLock) {
452  if (ingestMessageBox != null) {
453  ingestMessageBox.clearMessages();
454  }
455  ingestErrorMessagePosts.set(0);
456  }
457  }
458 
467  }
468 
476  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
477  if (jobCreationIsEnabled) {
478  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.coreComponentsAreActive());
479  if (job.hasIngestPipeline()) {
480  long taskId = nextThreadId.incrementAndGet();
481  Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobTask(taskId, job));
482  startIngestJobTasks.put(taskId, task);
483  }
484  }
485  }
486 
498  public synchronized IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
499  if (this.jobCreationIsEnabled) {
500  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.coreComponentsAreActive());
501  if (job.hasIngestPipeline()) {
502  return this.startIngestJob(job); // Start job
503  }
504  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled."), null);
505  }
506  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null);
507  }
508 
519  @Deprecated
520  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
521  return beginIngestJob(dataSources, settings).getJob();
522  }
523 
532  @NbBundle.Messages({
533  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
534  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
535  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
536  "IngestManager.startupErr.dlgErrorList=Errors:"
537  })
539  List<IngestModuleError> errors = null;
540  if (this.jobCreationIsEnabled) {
541  // multi-user cases must have multi-user database service running
543  try {
544  if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
545  // display notification if running interactively
547  EventQueue.invokeLater(new Runnable() {
548  @Override
549  public void run() {
550  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
551  JOptionPane.showMessageDialog(null,
552  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
553  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
554  JOptionPane.ERROR_MESSAGE);
555  }
556  });
557  }
558  // abort ingest
559  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList());
560  }
562  return new IngestJobStartResult(null, new IngestManagerException("Database server is down.", ex), Collections.<IngestModuleError>emptyList());
563  }
564  }
565 
566  if (!ingestMonitor.isRunning()) {
567  ingestMonitor.start();
568  }
569 
570  synchronized (jobsById) {
571  jobsById.put(job.getId(), job);
572  }
573  errors = job.start();
574  if (errors.isEmpty()) {
575  this.fireIngestJobStarted(job.getId());
576  IngestManager.logger.log(Level.INFO, "Ingest job {0} started", job.getId()); //NON-NLS
577  } else {
578  synchronized (jobsById) {
579  this.jobsById.remove(job.getId());
580  }
581  for (IngestModuleError error : errors) {
582  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
583  }
584  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
586  final StringBuilder message = new StringBuilder();
587  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n");
588  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n");
589  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n");
590  for (IngestModuleError error : errors) {
591  String moduleName = error.getModuleDisplayName();
592  String errorMessage = error.getThrowable().getLocalizedMessage();
593  message.append(moduleName).append(": ").append(errorMessage).append("\n");
594  }
595  message.append("\n\n");
596  EventQueue.invokeLater(() -> {
597  JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
598  });
599  }
600  // abort ingest
601  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors);
602  }
603  }
604  return new IngestJobStartResult(job, null, errors);
605  }
606 
607  synchronized void finishIngestJob(IngestJob job) {
608  long jobId = job.getId();
609  synchronized (jobsById) {
610  jobsById.remove(jobId);
611  }
612  if (!job.isCancelled()) {
613  IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
614  fireIngestJobCompleted(jobId);
615  } else {
616  IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
617  fireIngestJobCancelled(jobId);
618  }
619  }
620 
626  public boolean isIngestRunning() {
627  synchronized (jobsById) {
628  return !jobsById.isEmpty();
629  }
630  }
631 
638  @Deprecated
639  public void cancelAllIngestJobs() {
641  }
642 
649  /*
650  * Cancel the start job tasks.
651  */
652  for (Future<Void> handle : startIngestJobTasks.values()) {
653  handle.cancel(true);
654  }
655 
656  /*
657  * Cancel the jobs in progress.
658  */
659  synchronized (jobsById) {
660  for (IngestJob job : this.jobsById.values()) {
661  job.cancel(reason);
662  }
663  }
664  }
665 
671  public void addIngestJobEventListener(final PropertyChangeListener listener) {
672  jobEventPublisher.addSubscriber(jobEventNames, listener);
673  }
674 
680  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
681  jobEventPublisher.removeSubscriber(jobEventNames, listener);
682  }
683 
689  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
690  moduleEventPublisher.addSubscriber(moduleEventNames, listener);
691  }
692 
698  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
699  moduleEventPublisher.removeSubscriber(moduleEventNames, listener);
700  }
701 
710  @Deprecated
711  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
712  instance.jobEventPublisher.addSubscriber(jobEventNames, listener);
713  instance.moduleEventPublisher.addSubscriber(moduleEventNames, listener);
714  }
715 
724  @Deprecated
725  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
726  instance.jobEventPublisher.removeSubscriber(jobEventNames, listener);
727  instance.moduleEventPublisher.removeSubscriber(moduleEventNames, listener);
728  }
729 
735  void fireIngestJobStarted(long ingestJobId) {
736  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
737  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
738  }
739 
745  void fireIngestJobCompleted(long ingestJobId) {
746  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
747  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
748  }
749 
755  void fireIngestJobCancelled(long ingestJobId) {
756  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
757  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
758  }
759 
767  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
768  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
769  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
770  }
771 
779  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
780  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
781  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
782  }
783 
791  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
792  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
793  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
794  }
795 
801  void fireFileIngestDone(AbstractFile file) {
802  AutopsyEvent event = new FileAnalyzedEvent(file);
803  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
804  }
805 
811  void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
812  AutopsyEvent event = new BlackboardPostEvent(moduleDataEvent);
813  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
814  }
815 
823  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
824  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
825  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
826  }
827 
834  void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
835  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
836  }
837 
844  void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
845  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
846  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
847  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
848 
849  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
850  }
851 
857  void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
858  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId()));
859  }
860 
866  void setIngestTaskProgressCompleted(FileIngestTask task) {
867  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
868  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
869  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
870  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
871  }
872 
879  private void incrementModuleRunTime(String moduleName, Long duration) {
880  if (moduleName.equals("IDLE")) { //NON-NLS
881  return;
882  }
883 
884  synchronized (ingestModuleRunTimes) {
885  Long prevTimeL = ingestModuleRunTimes.get(moduleName);
886  long prevTime = 0;
887  if (prevTimeL != null) {
888  prevTime = prevTimeL;
889  }
890  prevTime += duration;
891  ingestModuleRunTimes.put(moduleName, prevTime);
892  }
893  }
894 
900  Map<String, Long> getModuleRunTimes() {
901  synchronized (ingestModuleRunTimes) {
902  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
903  return times;
904  }
905  }
906 
912  List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
913  return new ArrayList<>(ingestThreadActivitySnapshots.values());
914  }
915 
921  List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
922  List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
923  synchronized (jobsById) {
924  for (IngestJob job : jobsById.values()) {
925  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
926  }
927  }
928  return snapShots;
929  }
930 
937  long getFreeDiskSpace() {
938  if (ingestMonitor != null) {
939  return ingestMonitor.getFreeSpace();
940  } else {
941  return -1;
942  }
943  }
944 
948  private final class StartIngestJobTask implements Callable<Void> {
949 
950  private final long threadId;
951  private final IngestJob job;
952  private ProgressHandle progress;
953 
954  StartIngestJobTask(long threadId, IngestJob job) {
955  this.threadId = threadId;
956  this.job = job;
957  }
958 
959  @Override
960  public Void call() {
961  try {
962  if (Thread.currentThread().isInterrupted()) {
963  synchronized (jobsById) {
964  jobsById.remove(job.getId());
965  }
966  return null;
967  }
968 
970  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
971  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
972  @Override
973  public boolean cancel() {
974  if (progress != null) {
975  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
976  }
977  Future<?> handle = startIngestJobTasks.remove(threadId);
978  handle.cancel(true);
979  return true;
980  }
981  });
982  progress.start();
983  }
984 
985  startIngestJob(job);
986  return null;
987 
988  } finally {
989  if (null != progress) {
990  progress.finish();
991  }
992  startIngestJobTasks.remove(threadId);
993  }
994  }
995 
996  }
997 
1001  private final class ExecuteIngestJobsTask implements Runnable {
1002 
1003  private final long threadId;
1004  private final IngestTaskQueue tasks;
1005 
1006  ExecuteIngestJobsTask(long threadId, IngestTaskQueue tasks) {
1007  this.threadId = threadId;
1008  this.tasks = tasks;
1009  }
1010 
1011  @Override
1012  public void run() {
1013  while (true) {
1014  try {
1015  IngestTask task = tasks.getNextTask(); // Blocks.
1016  task.execute(threadId);
1017  } catch (InterruptedException ex) {
1018  break;
1019  }
1020  if (Thread.currentThread().isInterrupted()) {
1021  break;
1022  }
1023  }
1024  }
1025  }
1026 
1030  private static final class PublishEventTask implements Runnable {
1031 
1032  private final AutopsyEvent event;
1034 
1043  this.event = event;
1044  this.publisher = publisher;
1045  }
1046 
1047  @Override
1048  public void run() {
1049  publisher.publish(event);
1050  }
1051 
1052  }
1053 
1054  static final class IngestThreadActivitySnapshot {
1055 
1056  private final long threadId;
1057  private final Date startTime;
1058  private final String activity;
1059  private final String dataSourceName;
1060  private final String fileName;
1061  private final long jobId;
1062 
1063  // nothing is running on the thread
1064  IngestThreadActivitySnapshot(long threadId) {
1065  this.threadId = threadId;
1066  startTime = new Date();
1067  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
1068  this.dataSourceName = "";
1069  this.fileName = "";
1070  this.jobId = 0;
1071  }
1072 
1073  // data souce thread
1074  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
1075  this.threadId = threadId;
1076  this.jobId = jobId;
1077  startTime = new Date();
1078  this.activity = activity;
1079  this.dataSourceName = dataSource.getName();
1080  this.fileName = "";
1081  }
1082 
1083  // file ingest thread
1084  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
1085  this.threadId = threadId;
1086  this.jobId = jobId;
1087  startTime = new Date();
1088  this.activity = activity;
1089  this.dataSourceName = dataSource.getName();
1090  this.fileName = file.getName();
1091  }
1092 
1093  long getJobId() {
1094  return jobId;
1095  }
1096 
1097  long getThreadId() {
1098  return threadId;
1099  }
1100 
1101  Date getStartTime() {
1102  return startTime;
1103  }
1104 
1105  String getActivity() {
1106  return activity;
1107  }
1108 
1109  String getDataSourceName() {
1110  return dataSourceName;
1111  }
1112 
1113  String getFileName() {
1114  return fileName;
1115  }
1116 
1117  }
1118 
1122  public final static class IngestManagerException extends Exception {
1123 
1124  private static final long serialVersionUID = 1L;
1125 
1131  private IngestManagerException(String message) {
1132  super(message);
1133  }
1134 
1141  private IngestManagerException(String message, Throwable cause) {
1142  super(message, cause);
1143  }
1144  }
1145 
1146 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static synchronized IngestManager getInstance()
IngestJobStartResult startIngestJob(IngestJob job)
final Map< Long, Future< Void > > startIngestJobTasks
static void addPropertyChangeListener(final PropertyChangeListener listener)
static synchronized void setCoreComponentsActive(boolean coreComponentsActive)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
static synchronized ServicesMonitor getInstance()
void removeIngestJobEventListener(final PropertyChangeListener listener)
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Map< Long, IngestJob > jobsById
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
static synchronized boolean coreComponentsAreActive()
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final Set< String > jobEventNames
static final Set< String > moduleEventNames
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:161
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
synchronized IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
synchronized void setRunInteractively(boolean runInteractively)
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
Definition: Case.java:330

Copyright © 2012-2016 Basis Technology. Generated on: Mon Jan 2 2017
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.