Autopsy  4.19.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.eventbus.Subscribe;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import java.awt.EventQueue;
24 import java.beans.PropertyChangeEvent;
25 import java.beans.PropertyChangeListener;
26 import java.io.Serializable;
27 import java.lang.reflect.InvocationTargetException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Date;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.Callable;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.atomic.AtomicLong;
44 import java.util.logging.Level;
45 import java.util.stream.Collectors;
46 import java.util.stream.Stream;
47 import javax.annotation.concurrent.GuardedBy;
48 import javax.annotation.concurrent.Immutable;
49 import javax.annotation.concurrent.ThreadSafe;
50 import javax.swing.JOptionPane;
51 import javax.swing.SwingUtilities;
52 import org.netbeans.api.progress.ProgressHandle;
53 import org.openide.util.Cancellable;
54 import org.openide.util.NbBundle;
55 import org.openide.windows.WindowManager;
71 import org.sleuthkit.datamodel.AbstractFile;
72 import org.sleuthkit.datamodel.Blackboard;
73 import org.sleuthkit.datamodel.BlackboardArtifact;
74 import org.sleuthkit.datamodel.Content;
75 import org.sleuthkit.datamodel.DataSource;
76 import org.sleuthkit.datamodel.SleuthkitCase;
77 import org.sleuthkit.datamodel.TskCoreException;
78 
118 @ThreadSafe
120 
121  private final static Logger logger = Logger.getLogger(IngestManager.class.getName());
122  private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
123  private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
124  private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
125  private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
126  private final static int MAX_ERROR_MESSAGE_POSTS = 200;
127  @GuardedBy("IngestManager.class")
128  private static IngestManager instance;
129  private final int numberOfFileIngestThreads;
130  private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
131  private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
132  @GuardedBy("startIngestJobFutures")
133  private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
134  @GuardedBy("ingestJobsById")
135  private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
136  private final ExecutorService dataSourceLevelIngestJobTasksExecutor;
137  private final ExecutorService fileLevelIngestJobTasksExecutor;
138  private final ExecutorService resultIngestTasksExecutor;
139  private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
143  private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
144  private final Object ingestMessageBoxLock = new Object();
145  private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
146  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
147  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
148  private volatile IngestMessageTopComponent ingestMessageBox;
149  private volatile boolean caseIsOpen;
150 
157  public synchronized static IngestManager getInstance() {
158  if (null == instance) {
159  instance = new IngestManager();
160  instance.subscribeToServiceMonitorEvents();
161  instance.subscribeToCaseEvents();
162  }
163  return instance;
164  }
165 
170  private IngestManager() {
171  /*
172  * Submit a single Runnable ingest manager task for processing data
173  * source level ingest job tasks to the data source level ingest job
174  * tasks executor.
175  */
176  dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
177  long threadId = nextIngestManagerTaskId.incrementAndGet();
178  dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
179  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
180 
181  /*
182  * Submit a configurable number of Runnable ingest manager tasks for
183  * processing file level ingest job tasks to the file level ingest job
184  * tasks executor.
185  */
187  fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
188  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
189  threadId = nextIngestManagerTaskId.incrementAndGet();
190  fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
191  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
192  }
193 
194  resultIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-results-ingest-%d").build()); //NON-NLS;
195  threadId = nextIngestManagerTaskId.incrementAndGet();
196  resultIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getResultIngestTaskQueue()));
197  // RJCTODO
198  // ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
199  // RJCTODO: Where is the shut down code?
200  }
201 
207  PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
208  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
209  /*
210  * The application services considered to be key services are
211  * only necessary for multi-user cases.
212  */
213  try {
215  return;
216  }
217  } catch (NoCurrentCaseException noCaseOpenException) {
218  return;
219  }
220 
221  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
222  logger.log(Level.SEVERE, "Service {0} is down, cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
224  EventQueue.invokeLater(new Runnable() {
225  @Override
226  public void run() {
227  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
228  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
229  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
230  JOptionPane.ERROR_MESSAGE);
231  }
232  });
233  }
235  }
236  };
237 
238  /*
239  * The key services for multi-user cases are currently the case database
240  * server and the Solr server. The Solr server is a key service not
241  * because search is essential, but because the coordination service
242  * (ZooKeeper) is running embedded within the Solr server.
243  */
244  Set<String> servicesList = new HashSet<>();
245  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
246  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
247  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
248  }
249 
254  private void subscribeToCaseEvents() {
255  Case.addEventTypeSubscriber(EnumSet.of(Case.Events.CURRENT_CASE), (PropertyChangeEvent event) -> {
256  if (event.getNewValue() != null) {
257  handleCaseOpened();
258  } else {
259  handleCaseClosed();
260  }
261  });
262  }
263 
272  void handleCaseOpened() {
273  caseIsOpen = true;
275  try {
276  Case openedCase = Case.getCurrentCaseThrows();
277  String channelPrefix = openedCase.getName();
278  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
279  jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
280  moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
281  }
282  openedCase.getSleuthkitCase().registerForEvents(this);
283  } catch (NoCurrentCaseException | AutopsyEventException ex) {
284  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
285  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
286  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
287  }
288  }
289 
297  @Subscribe
298  void handleArtifactsPosted(Blackboard.ArtifactsPostedEvent tskEvent) {
299  for (BlackboardArtifact.Type artifactType : tskEvent.getArtifactTypes()) {
300  ModuleDataEvent legacyEvent = new ModuleDataEvent(tskEvent.getModuleName(), artifactType, tskEvent.getArtifacts(artifactType));
301  AutopsyEvent autopsyEvent = new BlackboardPostEvent(legacyEvent);
302  eventPublishingExecutor.submit(new PublishEventTask(autopsyEvent, moduleEventPublisher));
303  }
304  }
305 
315  void handleCaseClosed() {
316  /*
317  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
318  * to complete when a case is closed.
319  */
320  cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
321  Case.getCurrentCase().getSleuthkitCase().unregisterForEvents(this);
324  caseIsOpen = false;
326  }
327 
339  public IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings) throws TskCoreException {
340  IngestJob job = new IngestJob(dataSource, IngestJob.Mode.STREAMING, settings);
341  IngestJobInputStream stream = new IngestJobInputStream(job);
342  if (stream.getIngestJobStartResult().getJob() != null) {
343  return stream;
344  } else if (stream.getIngestJobStartResult().getModuleErrors().isEmpty()) {
345  for (IngestModuleError error : stream.getIngestJobStartResult().getModuleErrors()) {
346  logger.log(Level.SEVERE, String.format("%s ingest module startup error for %s", error.getModuleDisplayName(), dataSource.getName()), error.getThrowable());
347  }
348  throw new TskCoreException("Error starting ingest modules");
349  } else {
350  throw new TskCoreException("Error starting ingest modules", stream.getIngestJobStartResult().getStartupException());
351  }
352  }
353 
362  }
363 
370  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
371  if (caseIsOpen) {
372  IngestJob job = new IngestJob(dataSources, settings);
373  if (job.hasIngestPipeline()) {
374  long taskId = nextIngestManagerTaskId.incrementAndGet();
375  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
376  synchronized (startIngestJobFutures) {
377  startIngestJobFutures.put(taskId, task);
378  }
379  }
380  }
381  }
382 
391  public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
392  if (caseIsOpen) {
393  IngestJob job = new IngestJob(dataSource, files, settings);
394  if (job.hasIngestPipeline()) {
395  long taskId = nextIngestManagerTaskId.incrementAndGet();
396  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
397  synchronized (startIngestJobFutures) {
398  startIngestJobFutures.put(taskId, task);
399  }
400  }
401  }
402  }
403 
413  public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
414  if (caseIsOpen) {
415  IngestJob job = new IngestJob(dataSources, settings);
416  if (job.hasIngestPipeline()) {
417  return startIngestJob(job);
418  }
419  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); //NON-NLS
420  }
421  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); //NON-NLS
422  }
423 
432  @NbBundle.Messages({
433  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
434  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
435  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
436  "IngestManager.startupErr.dlgErrorList=Errors:"
437  })
438  IngestJobStartResult startIngestJob(IngestJob job) {
439 
440  // initialize IngestMessageInbox, if it hasn't been initialized yet. This can't be done in
441  // the constructor because that ends up freezing the UI on startup (JIRA-7345).
442  if (SwingUtilities.isEventDispatchThread()) {
443  initIngestMessageInbox();
444  } else {
445  try {
446  SwingUtilities.invokeAndWait(() -> initIngestMessageInbox());
447  } catch (InterruptedException ex) {
448  // ignore interruptions
449  } catch (InvocationTargetException ex) {
450  logger.log(Level.WARNING, "There was an error starting ingest message inbox", ex);
451  }
452  }
453 
454  List<IngestModuleError> errors = null;
455  Case openCase;
456  try {
457  openCase = Case.getCurrentCaseThrows();
458  } catch (NoCurrentCaseException ex) {
459  return new IngestJobStartResult(null, new IngestManagerException("Exception while getting open case.", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
460  }
461  if (openCase.getCaseType() == Case.CaseType.MULTI_USER_CASE) {
462  try {
463  if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
464  if (RuntimeProperties.runningWithGUI()) {
465  EventQueue.invokeLater(new Runnable() {
466  @Override
467  public void run() {
468  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
469  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
470  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
471  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
472  JOptionPane.ERROR_MESSAGE);
473  }
474  });
475  }
476  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList()); //NON-NLS
477  }
478  } catch (ServicesMonitor.ServicesMonitorException ex) {
479  return new IngestJobStartResult(null, new IngestManagerException("Database server is down", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
480  }
481  }
482 
483  if (!ingestMonitor.isRunning()) {
484  ingestMonitor.start();
485  }
486 
487  synchronized (ingestJobsById) {
488  ingestJobsById.put(job.getId(), job);
489  }
490  IngestManager.logger.log(Level.INFO, "Starting ingest job {0}", job.getId()); //NON-NLS
491  try {
492  errors = job.start();
493  } catch (InterruptedException ex) {
494  return new IngestJobStartResult(null, new IngestManagerException("Interrupted while starting ingest", ex), errors); //NON-NLS
495  }
496  if (errors.isEmpty()) {
497  this.fireIngestJobStarted(job.getId());
498  } else {
499  synchronized (ingestJobsById) {
500  this.ingestJobsById.remove(job.getId());
501  }
502  for (IngestModuleError error : errors) {
503  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
504  }
505  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
506  if (RuntimeProperties.runningWithGUI()) {
507  final StringBuilder message = new StringBuilder(1024);
508  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n"); //NON-NLS
509  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n"); //NON-NLS
510  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n"); //NON-NLS
511  for (IngestModuleError error : errors) {
512  String moduleName = error.getModuleDisplayName();
513  String errorMessage = error.getThrowable().getLocalizedMessage();
514  message.append(moduleName).append(": ").append(errorMessage).append("\n"); //NON-NLS
515  }
516  message.append("\n\n");
517  EventQueue.invokeLater(() -> {
518  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
519  });
520  }
521  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors); //NON-NLS
522  }
523 
524  return new IngestJobStartResult(job, null, errors);
525  }
526 
532  void finishIngestJob(IngestJob job
533  ) {
534  long jobId = job.getId();
535  synchronized (ingestJobsById) {
536  ingestJobsById.remove(jobId);
537  }
538  if (!job.isCancelled()) {
539  IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
540  fireIngestJobCompleted(jobId);
541  } else {
542  IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
543  fireIngestJobCancelled(jobId);
544  }
545  }
546 
553  public boolean isIngestRunning() {
554  synchronized (ingestJobsById) {
555  return !ingestJobsById.isEmpty();
556  }
557  }
558 
565  synchronized (startIngestJobFutures) {
566  startIngestJobFutures.values().forEach((handle) -> {
567  handle.cancel(true);
568  });
569  }
570  synchronized (ingestJobsById) {
571  this.ingestJobsById.values().forEach((job) -> {
572  job.cancel(reason);
573  });
574  }
575  }
576 
582  public void addIngestJobEventListener(final PropertyChangeListener listener) {
583  jobEventPublisher.addSubscriber(INGEST_JOB_EVENT_NAMES, listener);
584  }
585 
593  public void addIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
594  eventTypes.forEach((IngestJobEvent event) -> {
595  jobEventPublisher.addSubscriber(event.toString(), listener);
596  });
597  }
598 
604  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
605  jobEventPublisher.removeSubscriber(INGEST_JOB_EVENT_NAMES, listener);
606  }
607 
614  public void removeIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
615  eventTypes.forEach((IngestJobEvent event) -> {
616  jobEventPublisher.removeSubscriber(event.toString(), listener);
617  });
618  }
619 
625  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
626  moduleEventPublisher.addSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
627  }
628 
636  public void addIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
637  eventTypes.forEach((IngestModuleEvent event) -> {
638  moduleEventPublisher.addSubscriber(event.toString(), listener);
639  });
640  }
641 
647  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
648  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
649  }
650 
657  public void removeIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
658  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
659  }
660 
666  void fireIngestJobStarted(long ingestJobId) {
667  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
668  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
669  }
670 
676  void fireIngestJobCompleted(long ingestJobId) {
677  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
678  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
679  }
680 
686  void fireIngestJobCancelled(long ingestJobId) {
687  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
688  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
689  }
690 
699  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
700  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
701  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
702  }
703 
712  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
713  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
714  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
715  }
716 
725  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
726  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
727  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
728  }
729 
736  void fireFileIngestDone(AbstractFile file) {
737  AutopsyEvent event = new FileAnalyzedEvent(file);
738  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
739  }
740 
748  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
749  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
750  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
751  }
752 
761  void initIngestMessageInbox() {
762  synchronized (this.ingestMessageBoxLock) {
763  ingestMessageBox = IngestMessageTopComponent.findInstance();
764  }
765  }
766 
772  void postIngestMessage(IngestMessage message) {
773  synchronized (this.ingestMessageBoxLock) {
774  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
775  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
776  ingestMessageBox.displayMessage(message);
777  } else {
778  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
779  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
780  ingestMessageBox.displayMessage(message);
781  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
782  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
783  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
784  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
785  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
786  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
787  }
788  }
789  }
790  }
791  }
792 
793  /*
794  * Clears the ingest messages inbox.
795  */
796  private void clearIngestMessageBox() {
797  synchronized (this.ingestMessageBoxLock) {
798  if (null != ingestMessageBox) {
799  ingestMessageBox.clearMessages();
800  }
802  }
803  }
804 
813  void setIngestTaskProgress(DataSourceIngestTask task, String currentModuleName) {
814  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
815  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource());
816  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
817 
818  /*
819  * Update the total run time for the PREVIOUS ingest module in the
820  * pipeline, which has now finished its processing for the task.
821  */
822  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
823  }
824 
833  void setIngestTaskProgress(FileIngestTask task, String currentModuleName) {
834  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
835  IngestThreadActivitySnapshot newSnap;
836  try {
837  newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource(), task.getFile());
838  } catch (TskCoreException ex) {
839  logger.log(Level.SEVERE, "Error getting file from file ingest task", ex);
840  newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource());
841  }
842  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
843 
844  /*
845  * Update the total run time for the PREVIOUS ingest module in the
846  * pipeline, which has now finished its processing for the task.
847  */
848  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
849  }
850 
856  void setIngestTaskProgressCompleted(IngestTask task) {
857  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
858  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
859  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
860 
861  /*
862  * Update the total run time for the LAST ingest module in the pipeline,
863  * which has now finished its processing for the task.
864  */
865  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
866  }
867 
874  void incrementModuleRunTime(String moduleDisplayName, Long duration) {
875  if (moduleDisplayName.equals("IDLE")) { //NON-NLS
876  return;
877  }
878 
879  synchronized (ingestModuleRunTimes) {
880  Long prevTimeL = ingestModuleRunTimes.get(moduleDisplayName);
881  long prevTime = 0;
882  if (prevTimeL != null) {
883  prevTime = prevTimeL;
884  }
885  prevTime += duration;
886  ingestModuleRunTimes.put(moduleDisplayName, prevTime);
887  }
888  }
889 
895  @Override
896  public Map<String, Long> getModuleRunTimes() {
897  synchronized (ingestModuleRunTimes) {
898  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
899  return times;
900  }
901  }
902 
909  @Override
910  public List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
911  return new ArrayList<>(ingestThreadActivitySnapshots.values());
912  }
913 
919  @Override
920  public List<Snapshot> getIngestJobSnapshots() {
921  List<Snapshot> snapShots = new ArrayList<>();
922  synchronized (ingestJobsById) {
923  ingestJobsById.values().forEach((job) -> {
924  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
925  });
926  }
927  return snapShots;
928  }
929 
936  long getFreeDiskSpace() {
937  if (ingestMonitor != null) {
938  return ingestMonitor.getFreeSpace();
939  } else {
940  return -1;
941  }
942  }
943 
947  private final class StartIngestJobTask implements Callable<Void> {
948 
949  private final long threadId;
950  private final IngestJob job;
951  private ProgressHandle progress;
952 
953  StartIngestJobTask(long threadId, IngestJob job) {
954  this.threadId = threadId;
955  this.job = job;
956  }
957 
958  @Override
959  public Void call() {
960  try {
961  if (Thread.currentThread().isInterrupted()) {
962  synchronized (ingestJobsById) {
963  ingestJobsById.remove(job.getId());
964  }
965  return null;
966  }
967 
969  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
970  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
971  @Override
972  public boolean cancel() {
973  if (progress != null) {
974  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
975  }
976  synchronized (startIngestJobFutures) {
977  Future<?> handle = startIngestJobFutures.remove(threadId);
978  handle.cancel(true);
979  }
980  return true;
981  }
982  });
983  progress.start();
984  }
985 
986  startIngestJob(job);
987  return null;
988 
989  } finally {
990  if (null != progress) {
991  progress.finish();
992  }
993  synchronized (startIngestJobFutures) {
994  startIngestJobFutures.remove(threadId);
995  }
996  }
997  }
998 
999  }
1000 
1004  private final class ExecuteIngestJobTasksTask implements Runnable {
1005 
1006  private final long threadId;
1007  private final BlockingIngestTaskQueue tasks;
1008 
1009  ExecuteIngestJobTasksTask(long threadId, BlockingIngestTaskQueue tasks) {
1010  this.threadId = threadId;
1011  this.tasks = tasks;
1012  }
1013 
1014  @Override
1015  public void run() {
1016  while (true) {
1017  try {
1018  IngestTask task = tasks.getNextTask(); // Blocks.
1019  task.execute(threadId);
1020  } catch (InterruptedException ex) {
1021  break;
1022  }
1023  if (Thread.currentThread().isInterrupted()) {
1024  break;
1025  }
1026  }
1027  }
1028  }
1029 
1033  private static final class PublishEventTask implements Runnable {
1034 
1035  private final AutopsyEvent event;
1037 
1046  this.event = event;
1047  this.publisher = publisher;
1048  }
1049 
1050  @Override
1051  public void run() {
1052  publisher.publish(event);
1053  }
1054 
1055  }
1056 
1061  @Immutable
1062  public static final class IngestThreadActivitySnapshot implements Serializable {
1063 
1064  private static final long serialVersionUID = 1L;
1065 
1066  private final long threadId;
1067  private final Date startTime;
1068  private final String activity;
1069  private final String dataSourceName;
1070  private final String fileName;
1071  private final long jobId;
1072 
1080  IngestThreadActivitySnapshot(long threadId) {
1081  this.threadId = threadId;
1082  startTime = new Date();
1083  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
1084  this.dataSourceName = "";
1085  this.fileName = "";
1086  this.jobId = 0;
1087  }
1088 
1099  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
1100  this.threadId = threadId;
1101  this.jobId = jobId;
1102  startTime = new Date();
1103  this.activity = activity;
1104  this.dataSourceName = dataSource.getName();
1105  this.fileName = "";
1106  }
1107 
1120  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
1121  this.threadId = threadId;
1122  this.jobId = jobId;
1123  startTime = new Date();
1124  this.activity = activity;
1125  this.dataSourceName = dataSource.getName();
1126  this.fileName = file.getName();
1127  }
1128 
1134  long getIngestJobId() {
1135  return jobId;
1136  }
1137 
1143  long getThreadId() {
1144  return threadId;
1145  }
1146 
1152  Date getStartTime() {
1153  return startTime;
1154  }
1155 
1161  String getActivity() {
1162  return activity;
1163  }
1164 
1172  String getDataSourceName() {
1173  return dataSourceName;
1174  }
1175 
1181  String getFileName() {
1182  return fileName;
1183  }
1184 
1185  }
1186 
1190  public enum IngestJobEvent {
1191 
1228  }
1229 
1233  public enum IngestModuleEvent {
1234 
1256  }
1257 
1261  public final static class IngestManagerException extends Exception {
1262 
1263  private static final long serialVersionUID = 1L;
1264 
1270  private IngestManagerException(String message) {
1271  super(message);
1272  }
1273 
1280  private IngestManagerException(String message, Throwable cause) {
1281  super(message, cause);
1282  }
1283  }
1284 
1293  @Deprecated
1294  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
1297  }
1298 
1307  @Deprecated
1308  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
1311  }
1312 
1323  @Deprecated
1324  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
1325  return beginIngestJob(dataSources, settings).getJob();
1326  }
1327 
1334  @Deprecated
1335  public void cancelAllIngestJobs() {
1337  }
1338 
1339 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
void removeIngestModuleEventListener(final PropertyChangeListener listener)
IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings)
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
static synchronized IngestManager getInstance()
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:124
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
Definition: Case.java:711
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final AutopsyEventPublisher jobEventPublisher

Copyright © 2012-2021 Basis Technology. Generated on: Thu Sep 30 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.