Autopsy  4.19.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestTasksScheduler.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.Deque;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.TreeSet;
32 import java.util.concurrent.BlockingDeque;
33 import java.util.concurrent.LinkedBlockingDeque;
34 import java.util.logging.Level;
35 import java.util.regex.Matcher;
36 import java.util.regex.Pattern;
37 import javax.annotation.concurrent.GuardedBy;
38 import javax.annotation.concurrent.ThreadSafe;
41 import org.sleuthkit.datamodel.AbstractFile;
42 import org.sleuthkit.datamodel.Blackboard;
43 import org.sleuthkit.datamodel.Content;
44 import org.sleuthkit.datamodel.DataArtifact;
45 import org.sleuthkit.datamodel.DataSource;
46 import org.sleuthkit.datamodel.FileSystem;
47 import org.sleuthkit.datamodel.TskCoreException;
48 import org.sleuthkit.datamodel.TskData;
49 
54 @ThreadSafe
55 final class IngestTasksScheduler {
56 
57  private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
58  private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
59  @GuardedBy("IngestTasksScheduler.this")
60  private static IngestTasksScheduler instance;
61  private final IngestTaskTrackingQueue dataSourceIngestTasksQueue;
62  @GuardedBy("this")
63  private final TreeSet<FileIngestTask> topLevelFileIngestTasksQueue;
64  @GuardedBy("this")
65  private final Deque<FileIngestTask> batchedFileIngestTasksQueue;
66  @GuardedBy("this")
67  private final Queue<FileIngestTask> streamedFileIngestTasksQueue;
68  private final IngestTaskTrackingQueue fileIngestTasksQueue;
69  private final IngestTaskTrackingQueue artifactIngestTasksQueue;
70 
76  synchronized static IngestTasksScheduler getInstance() {
77  if (IngestTasksScheduler.instance == null) {
78  IngestTasksScheduler.instance = new IngestTasksScheduler();
79  }
80  return IngestTasksScheduler.instance;
81  }
82 
88  private IngestTasksScheduler() {
89  dataSourceIngestTasksQueue = new IngestTaskTrackingQueue();
90  topLevelFileIngestTasksQueue = new TreeSet<>(new RootDirectoryTaskComparator());
91  batchedFileIngestTasksQueue = new LinkedList<>();
92  fileIngestTasksQueue = new IngestTaskTrackingQueue();
93  streamedFileIngestTasksQueue = new LinkedList<>();
94  artifactIngestTasksQueue = new IngestTaskTrackingQueue();
95  }
96 
103  BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
104  return dataSourceIngestTasksQueue;
105  }
106 
113  BlockingIngestTaskQueue getFileIngestTaskQueue() {
114  return fileIngestTasksQueue;
115  }
116 
123  BlockingIngestTaskQueue getResultIngestTaskQueue() {
124  return artifactIngestTasksQueue;
125  }
126 
141  synchronized void scheduleIngestTasks(IngestJobPipeline ingestPipeline) {
142  if (!ingestPipeline.isCancelled()) {
143  if (ingestPipeline.hasDataSourceIngestModules()) {
144  scheduleDataSourceIngestTask(ingestPipeline);
145  }
146  if (ingestPipeline.hasFileIngestModules()) {
147  scheduleFileIngestTasks(ingestPipeline, Collections.emptyList());
148  }
149  if (ingestPipeline.hasDataArtifactIngestModules()) {
150  scheduleDataArtifactIngestTasks(ingestPipeline);
151  }
152  }
153  }
154 
166  synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestPipeline) {
167  if (!ingestPipeline.isCancelled()) {
168  DataSourceIngestTask task = new DataSourceIngestTask(ingestPipeline);
169  try {
170  dataSourceIngestTasksQueue.putLast(task);
171  } catch (InterruptedException ex) {
172  IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (pipelineId={%d)", ingestPipeline.getId()), ex);
173  Thread.currentThread().interrupt();
174  }
175  }
176  }
177 
193  synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestPipeline, Collection<AbstractFile> files) {
194  if (!ingestPipeline.isCancelled()) {
195  Collection<AbstractFile> candidateFiles;
196  if (files.isEmpty()) {
197  candidateFiles = getTopLevelFiles(ingestPipeline.getDataSource());
198  } else {
199  candidateFiles = files;
200  }
201  for (AbstractFile file : candidateFiles) {
202  FileIngestTask task = new FileIngestTask(ingestPipeline, file);
203  if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
204  topLevelFileIngestTasksQueue.add(task);
205  }
206  }
207  refillFileIngestTasksQueue();
208  }
209  }
210 
223  synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestPipeline, List<Long> fileIds) {
224  if (!ingestPipeline.isCancelled()) {
225  for (long id : fileIds) {
226  /*
227  * Create the file ingest task. Note that we do not do the
228  * shouldEnqueueFileTask() check here in order to delay querying
229  * the case database to construct the AbstractFile object. The
230  * file filter will be applied before the file task makes it to
231  * the task queue consumed by the file ingest threads.
232  */
233  FileIngestTask task = new FileIngestTask(ingestPipeline, id);
234  streamedFileIngestTasksQueue.add(task);
235  }
236  refillFileIngestTasksQueue();
237  }
238  }
239 
255  synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestPipeline, Collection<AbstractFile> files) {
256  if (!ingestPipeline.isCancelled()) {
257  /*
258  * Put the files directly into the queue for the file ingest
259  * threads, if they pass the file filter for the job. The files are
260  * added to the queue for the ingest threads BEFORE the other queued
261  * tasks because the use case for this method is scheduling new
262  * carved or derived files from a high priority task that is already
263  * in progress.
264  */
265  for (AbstractFile file : files) {
266  FileIngestTask fileTask = new FileIngestTask(ingestPipeline, file);
267  if (shouldEnqueueFileTask(fileTask)) {
268  try {
269  fileIngestTasksQueue.putFirst(fileTask);
270  } catch (InterruptedException ex) {
271  DataSource dataSource = ingestPipeline.getDataSource();
272  logger.log(Level.WARNING, String.format("Interrupted while enqueuing file tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
273  Thread.currentThread().interrupt();
274  return;
275  }
276  }
277  }
278  }
279  }
280 
293  synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline) {
294  if (!ingestPipeline.isCancelled()) {
295  Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
296  try {
297  List<DataArtifact> artifacts = blackboard.getDataArtifacts(ingestPipeline.getDataSource().getId(), null);
298  scheduleDataArtifactIngestTasks(ingestPipeline, artifacts);
299  } catch (TskCoreException ex) {
300  DataSource dataSource = ingestPipeline.getDataSource();
301  logger.log(Level.SEVERE, String.format("Failed to retrieve data artifacts for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
302  }
303  }
304  }
305 
321  synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline, List<DataArtifact> artifacts) {
322  if (!ingestPipeline.isCancelled()) {
323  for (DataArtifact artifact : artifacts) {
324  DataArtifactIngestTask task = new DataArtifactIngestTask(ingestPipeline, artifact);
325  try {
326  this.artifactIngestTasksQueue.putLast(task);
327  } catch (InterruptedException ex) {
328  DataSource dataSource = ingestPipeline.getDataSource();
329  logger.log(Level.WARNING, String.format("Interrupted while enqueuing data artifact tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
330  Thread.currentThread().interrupt();
331  break;
332  }
333  }
334  }
335  }
336 
343  synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
344  dataSourceIngestTasksQueue.taskCompleted(task);
345  }
346 
353  synchronized void notifyTaskCompleted(FileIngestTask task) {
354  fileIngestTasksQueue.taskCompleted(task);
355  refillFileIngestTasksQueue();
356  }
357 
364  synchronized void notifyTaskCompleted(DataArtifactIngestTask task) {
365  artifactIngestTasksQueue.taskCompleted(task);
366  }
367 
376  synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestPipeline) {
377  long pipelineId = ingestPipeline.getId();
378  return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId)
379  || hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId)
380  || hasTasksForJob(batchedFileIngestTasksQueue, pipelineId)
381  || hasTasksForJob(streamedFileIngestTasksQueue, pipelineId)
382  || fileIngestTasksQueue.hasTasksForJob(pipelineId)
383  || artifactIngestTasksQueue.hasTasksForJob(pipelineId));
384  }
385 
405  synchronized void cancelPendingFileTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
406  long jobId = ingestJobPipeline.getId();
407  removeTasksForJob(topLevelFileIngestTasksQueue, jobId);
408  removeTasksForJob(batchedFileIngestTasksQueue, jobId);
409  removeTasksForJob(streamedFileIngestTasksQueue, jobId);
410  }
411 
420  private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
421  List<AbstractFile> topLevelFiles = new ArrayList<>();
422  Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
423  if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
424  // The data source is itself a file to be processed.
425  topLevelFiles.add((AbstractFile) dataSource);
426  } else {
427  for (AbstractFile root : rootObjects) {
428  List<Content> children;
429  try {
430  children = root.getChildren();
431  if (children.isEmpty()) {
432  // Add the root object itself, it could be an unallocated
433  // space file, or a child of a volume or an image.
434  topLevelFiles.add(root);
435  } else {
436  // The root object is a file system root directory, get
437  // the files within it.
438  for (Content child : children) {
439  if (child instanceof AbstractFile) {
440  topLevelFiles.add((AbstractFile) child);
441  }
442  }
443  }
444  } catch (TskCoreException ex) {
445  logger.log(Level.SEVERE, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS
446  }
447  }
448  }
449  return topLevelFiles;
450  }
451 
459  synchronized private void refillFileIngestTasksQueue() {
460  try {
461  takeFromStreamingFileTasksQueue();
462  takeFromBatchTasksQueues();
463  } catch (InterruptedException ex) {
464  IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
465  Thread.currentThread().interrupt();
466  }
467  }
468 
475  synchronized private void takeFromStreamingFileTasksQueue() throws InterruptedException {
476  while (fileIngestTasksQueue.isEmpty()) {
477  int taskCount = 0;
478  while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
479  final FileIngestTask streamingTask = streamedFileIngestTasksQueue.poll();
480  if (streamingTask == null) {
481  return; // No streaming tasks are queued right now
482  }
483  if (shouldEnqueueFileTask(streamingTask)) {
484  fileIngestTasksQueue.putLast(streamingTask);
485  taskCount++;
486  }
487  }
488  }
489  }
490 
516  synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
517 
518  while (fileIngestTasksQueue.isEmpty()) {
519  /*
520  * If the batched file task queue is empty, move the highest
521  * priority top level file task into it.
522  */
523  if (batchedFileIngestTasksQueue.isEmpty()) {
524  final FileIngestTask topLevelTask = topLevelFileIngestTasksQueue.pollFirst();
525  if (topLevelTask != null) {
526  batchedFileIngestTasksQueue.addLast(topLevelTask);
527  }
528  }
529 
530  /*
531  * Try to move the next task from the batched file tasks queue into
532  * the queue for the file ingest threads.
533  */
534  final FileIngestTask nextTask = batchedFileIngestTasksQueue.pollFirst();
535  if (nextTask == null) {
536  return;
537  }
538  if (shouldEnqueueFileTask(nextTask)) {
539  fileIngestTasksQueue.putLast(nextTask);
540  }
541 
542  /*
543  * If the task that was just queued for the file ingest threads has
544  * children, queue tasks for the children as well.
545  */
546  AbstractFile file = null;
547  try {
548  file = nextTask.getFile();
549  for (Content child : file.getChildren()) {
550  if (child instanceof AbstractFile) {
551  AbstractFile childFile = (AbstractFile) child;
552  FileIngestTask childTask = new FileIngestTask(nextTask.getIngestJobPipeline(), childFile);
553  if (childFile.hasChildren()) {
554  batchedFileIngestTasksQueue.add(childTask);
555  } else if (shouldEnqueueFileTask(childTask)) {
556  fileIngestTasksQueue.putLast(childTask);
557  }
558  }
559  }
560  } catch (TskCoreException ex) {
561  if (file != null) {
562  logger.log(Level.SEVERE, String.format("Error getting the children of %s (object ID = %d)", file.getName(), file.getId()), ex); //NON-NLS
563  } else {
564  logger.log(Level.SEVERE, "Error loading file with object ID = {0}", nextTask.getFileId()); //NON-NLS
565  }
566  }
567  }
568  }
569 
580  private static boolean shouldEnqueueFileTask(final FileIngestTask task) {
581  AbstractFile file;
582  try {
583  file = task.getFile();
584  } catch (TskCoreException ex) {
585  logger.log(Level.SEVERE, "Error loading file with ID {0}", task.getFileId());
586  return false;
587  }
588 
589  // Skip the task if the file is actually the pseudo-file for the parent
590  // or current directory.
591  String fileName = file.getName();
592 
593  if (fileName.equals(".") || fileName.equals("..")) {
594  return false;
595  }
596 
597  /*
598  * Ensures that all directories, files which are members of the ingest
599  * file filter, and unallocated blocks (when processUnallocated is
600  * enabled) all continue to be processed. AbstractFiles which do not
601  * meet one of these criteria will be skipped.
602  *
603  * An additional check to see if unallocated space should be processed
604  * is part of the FilesSet.fileIsMemberOf() method.
605  *
606  * This code may need to be updated when
607  * TSK_DB_FILES_TYPE_ENUM.UNUSED_BLOCKS comes into use by Autopsy.
608  */
609  if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
610  return false;
611  }
612 
613  // Skip the task if the file is one of a select group of special, large
614  // NTFS or FAT file system files.
615  if (file instanceof org.sleuthkit.datamodel.File) {
616  final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file;
617 
618  // Get the type of the file system, if any, that owns the file.
619  TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
620  try {
621  FileSystem fs = f.getFileSystem();
622  if (fs != null) {
623  fsType = fs.getFsType();
624  }
625  } catch (TskCoreException ex) {
626  logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS
627  }
628 
629  // If the file system is not NTFS or FAT, don't skip the file.
630  if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
631  return true;
632  }
633 
634  // Find out whether the file is in a root directory.
635  boolean isInRootDir = false;
636  try {
637  AbstractFile parent = f.getParentDirectory();
638  if (parent == null) {
639  isInRootDir = true;
640  } else {
641  isInRootDir = parent.isRoot();
642  }
643  } catch (TskCoreException ex) {
644  logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS
645  }
646 
647  // If the file is in the root directory of an NTFS or FAT file
648  // system, check its meta-address and check its name for the '$'
649  // character and a ':' character (not a default attribute).
650  if (isInRootDir && f.getMetaAddr() < 32) {
651  String name = f.getName();
652  if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) {
653  return false;
654  }
655  }
656  }
657 
658  return true;
659  }
660 
668  private static boolean shouldBeCarved(final FileIngestTask task) {
669  try {
670  AbstractFile file = task.getFile();
671  return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
672  } catch (TskCoreException ex) {
673  return false;
674  }
675  }
676 
685  private static boolean fileAcceptedByFilter(final FileIngestTask task) {
686  try {
687  AbstractFile file = task.getFile();
688  return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
689  } catch (TskCoreException ex) {
690  return false;
691  }
692  }
693 
703  synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long pipelineId) {
704  for (IngestTask task : tasks) {
705  if (task.getIngestJobPipeline().getId() == pipelineId) {
706  return true;
707  }
708  }
709  return false;
710  }
711 
719  private static void removeTasksForJob(Collection<? extends IngestTask> tasks, long pipelineId) {
720  Iterator<? extends IngestTask> iterator = tasks.iterator();
721  while (iterator.hasNext()) {
722  IngestTask task = iterator.next();
723  if (task.getIngestJobPipeline().getId() == pipelineId) {
724  iterator.remove();
725  }
726  }
727  }
728 
738  private static int countTasksForJob(Collection<? extends IngestTask> tasks, long pipelineId) {
739  int count = 0;
740  for (IngestTask task : tasks) {
741  if (task.getIngestJobPipeline().getId() == pipelineId) {
742  count++;
743  }
744  }
745  return count;
746  }
747 
756  synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) {
757  return new IngestJobTasksSnapshot(jobId, dataSourceIngestTasksQueue.countQueuedTasksForJob(jobId),
758  countTasksForJob(topLevelFileIngestTasksQueue, jobId),
759  countTasksForJob(batchedFileIngestTasksQueue, jobId),
760  fileIngestTasksQueue.countQueuedTasksForJob(jobId),
761  dataSourceIngestTasksQueue.countRunningTasksForJob(jobId) + fileIngestTasksQueue.countRunningTasksForJob(jobId) + artifactIngestTasksQueue.countRunningTasksForJob(jobId),
762  countTasksForJob(streamedFileIngestTasksQueue, jobId),
763  artifactIngestTasksQueue.countQueuedTasksForJob(jobId));
764  }
765 
770  private static class RootDirectoryTaskComparator implements Comparator<FileIngestTask> {
771 
772  @Override
773  public int compare(FileIngestTask q1, FileIngestTask q2) {
774  // In practice the case where one or both calls to getFile() fails
775  // should never occur since such tasks would not be added to the queue.
776  AbstractFile file1 = null;
777  AbstractFile file2 = null;
778  try {
779  file1 = q1.getFile();
780  } catch (TskCoreException ex) {
781  // Do nothing - the exception has been logged elsewhere
782  }
783 
784  try {
785  file2 = q2.getFile();
786  } catch (TskCoreException ex) {
787  // Do nothing - the exception has been logged elsewhere
788  }
789 
790  if (file1 == null) {
791  if (file2 == null) {
792  return (int) (q2.getFileId() - q1.getFileId());
793  } else {
794  return 1;
795  }
796  } else if (file2 == null) {
797  return -1;
798  }
799 
800  AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(file1);
801  AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(file2);
802  if (p1 == p2) {
803  return (int) (file2.getId() - file1.getId());
804  } else {
805  return p2.ordinal() - p1.ordinal();
806  }
807  }
808 
813  private static class AbstractFilePriority {
814 
816  }
817 
818  enum Priority {
819 
820  LAST, LOW, MEDIUM, HIGH
821  }
822 
823  static final List<Pattern> LAST_PRI_PATHS = new ArrayList<>();
824 
825  static final List<Pattern> LOW_PRI_PATHS = new ArrayList<>();
826 
827  static final List<Pattern> MEDIUM_PRI_PATHS = new ArrayList<>();
828 
829  static final List<Pattern> HIGH_PRI_PATHS = new ArrayList<>();
830 
831  /*
832  * prioritize root directory folders based on the assumption that we
833  * are looking for user content. Other types of investigations may
834  * want different priorities.
835  */
836  static /*
837  * prioritize root directory folders based on the assumption that we
838  * are looking for user content. Other types of investigations may
839  * want different priorities.
840  */ {
841  // these files have no structure, so they go last
842  //unalloc files are handled as virtual files in getPriority()
843  //LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE));
844  //LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE));
845  LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE));
846  LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE));
847  // orphan files are often corrupt and windows does not typically have
848  // user content, so put them towards the bottom
849  LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
850  LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE));
851  // all other files go into the medium category too
852  MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE));
853  // user content is top priority
854  HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE));
855  HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE));
856  HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE));
857  HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE));
858  }
859 
867  static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) {
868  if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
869  //quickly filter out unstructured content
870  //non-fs virtual files and dirs, such as representing unalloc space
871  return AbstractFilePriority.Priority.LAST;
872  }
873  //determine the fs files priority by name
874  final String path = abstractFile.getName();
875  if (path == null) {
876  return AbstractFilePriority.Priority.MEDIUM;
877  }
878  for (Pattern p : HIGH_PRI_PATHS) {
879  Matcher m = p.matcher(path);
880  if (m.find()) {
881  return AbstractFilePriority.Priority.HIGH;
882  }
883  }
884  for (Pattern p : MEDIUM_PRI_PATHS) {
885  Matcher m = p.matcher(path);
886  if (m.find()) {
887  return AbstractFilePriority.Priority.MEDIUM;
888  }
889  }
890  for (Pattern p : LOW_PRI_PATHS) {
891  Matcher m = p.matcher(path);
892  if (m.find()) {
893  return AbstractFilePriority.Priority.LOW;
894  }
895  }
896  for (Pattern p : LAST_PRI_PATHS) {
897  Matcher m = p.matcher(path);
898  if (m.find()) {
899  return AbstractFilePriority.Priority.LAST;
900  }
901  }
902  //default is medium
903  return AbstractFilePriority.Priority.MEDIUM;
904  }
905  }
906  }
907 
912  @ThreadSafe
913  private class IngestTaskTrackingQueue implements BlockingIngestTaskQueue {
914 
915  private final BlockingDeque<IngestTask> taskQueue = new LinkedBlockingDeque<>();
916  @GuardedBy("this")
917  private final List<IngestTask> queuedTasks = new LinkedList<>();
918  @GuardedBy("this")
919  private final List<IngestTask> tasksInProgress = new LinkedList<>();
920 
931  void putFirst(IngestTask task) throws InterruptedException {
932  synchronized (this) {
933  this.queuedTasks.add(task);
934  }
935  try {
936  this.taskQueue.putFirst(task);
937  } catch (InterruptedException ex) {
938  synchronized (this) {
939  this.queuedTasks.remove(task);
940  }
941  throw ex;
942  }
943  }
944 
955  void putLast(IngestTask task) throws InterruptedException {
956  synchronized (this) {
957  this.queuedTasks.add(task);
958  }
959  try {
960  this.taskQueue.putLast(task);
961  } catch (InterruptedException ex) {
962  synchronized (this) {
963  this.queuedTasks.remove(task);
964  }
965  throw ex;
966  }
967  }
968 
979  @Override
980  public IngestTask getNextTask() throws InterruptedException {
981  IngestTask task = taskQueue.takeFirst();
982  synchronized (this) {
983  this.queuedTasks.remove(task);
984  this.tasksInProgress.add(task);
985  }
986  return task;
987  }
988 
994  boolean isEmpty() {
995  synchronized (this) {
996  return this.queuedTasks.isEmpty();
997  }
998  }
999 
1006  void taskCompleted(IngestTask task) {
1007  synchronized (this) {
1008  this.tasksInProgress.remove(task);
1009  }
1010  }
1011 
1020  boolean hasTasksForJob(long pipelineId) {
1021  synchronized (this) {
1022  return IngestTasksScheduler.hasTasksForJob(queuedTasks, pipelineId) || IngestTasksScheduler.hasTasksForJob(tasksInProgress, pipelineId);
1023  }
1024  }
1025 
1033  int countQueuedTasksForJob(long pipelineId) {
1034  synchronized (this) {
1035  return IngestTasksScheduler.countTasksForJob(queuedTasks, pipelineId);
1036  }
1037  }
1038 
1046  int countRunningTasksForJob(long pipelineId) {
1047  synchronized (this) {
1048  return IngestTasksScheduler.countTasksForJob(tasksInProgress, pipelineId);
1049  }
1050  }
1051 
1052  }
1053 
1057  static final class IngestJobTasksSnapshot implements Serializable {
1058 
1059  private static final long serialVersionUID = 1L;
1060  private final long jobId;
1061  private final long dsQueueSize;
1062  private final long rootQueueSize;
1063  private final long dirQueueSize;
1064  private final long fileQueueSize;
1065  private final long runningListSize;
1066  private final long streamingQueueSize;
1067  private final long artifactsQueueSize;
1068 
1083  IngestJobTasksSnapshot(long jobId, long dsQueueSize, long rootQueueSize, long dirQueueSize, long fileQueueSize,
1084  long runningListSize, long streamingQueueSize, long artifactsQueueSize) {
1085  this.jobId = jobId;
1086  this.dsQueueSize = dsQueueSize;
1087  this.rootQueueSize = rootQueueSize;
1088  this.dirQueueSize = dirQueueSize;
1089  this.fileQueueSize = fileQueueSize;
1090  this.runningListSize = runningListSize;
1091  this.streamingQueueSize = streamingQueueSize;
1092  this.artifactsQueueSize = artifactsQueueSize;
1093  }
1094 
1101  long getJobId() {
1102  return jobId;
1103  }
1104 
1111  long getRootQueueSize() {
1112  return rootQueueSize;
1113  }
1114 
1121  long getDirectoryTasksQueueSize() {
1122  return dirQueueSize;
1123  }
1124 
1125  long getFileQueueSize() {
1126  return fileQueueSize;
1127  }
1128 
1129  long getStreamingQueueSize() {
1130  return streamingQueueSize;
1131  }
1132 
1133  long getDsQueueSize() {
1134  return dsQueueSize;
1135  }
1136 
1137  long getRunningListSize() {
1138  return runningListSize;
1139  }
1140 
1141  long getArtifactsQueueSize() {
1142  return artifactsQueueSize;
1143  }
1144 
1145  }
1146 
1147 }

Copyright © 2012-2021 Basis Technology. Generated on: Fri Aug 6 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.