Autopsy  4.6.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestTasksScheduler.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2018 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.Deque;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.TreeSet;
30 import java.util.concurrent.BlockingDeque;
31 import java.util.concurrent.LinkedBlockingDeque;
32 import java.util.logging.Level;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35 import javax.annotation.concurrent.GuardedBy;
36 import javax.annotation.concurrent.ThreadSafe;
38 import org.sleuthkit.datamodel.AbstractFile;
39 import org.sleuthkit.datamodel.Content;
40 import org.sleuthkit.datamodel.FileSystem;
41 import org.sleuthkit.datamodel.TskCoreException;
42 import org.sleuthkit.datamodel.TskData;
43 
48 @ThreadSafe
49 final class IngestTasksScheduler {
50 
51  private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
52  private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
53  @GuardedBy("IngestTasksScheduler.this")
54  private static IngestTasksScheduler instance;
55  private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
56  @GuardedBy("this")
57  private final TreeSet<FileIngestTask> rootFileTaskQueue;
58  @GuardedBy("this")
59  private final Deque<FileIngestTask> pendingFileTaskQueue;
60  private final IngestTaskTrackingQueue fileIngestThreadsQueue;
61 
67  synchronized static IngestTasksScheduler getInstance() {
68  if (IngestTasksScheduler.instance == null) {
69  IngestTasksScheduler.instance = new IngestTasksScheduler();
70  }
71  return IngestTasksScheduler.instance;
72  }
73 
79  private IngestTasksScheduler() {
80  this.dataSourceIngestThreadQueue = new IngestTaskTrackingQueue();
81  this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator());
82  this.pendingFileTaskQueue = new LinkedList<>();
83  this.fileIngestThreadsQueue = new IngestTaskTrackingQueue();
84  }
85 
92  BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
93  return this.dataSourceIngestThreadQueue;
94  }
95 
102  BlockingIngestTaskQueue getFileIngestTaskQueue() {
103  return this.fileIngestThreadsQueue;
104  }
105 
112  synchronized void scheduleIngestTasks(DataSourceIngestJob job) {
113  if (!job.isCancelled()) {
114  /*
115  * Scheduling of both the data source ingest task and the initial
116  * file ingest tasks for a job must be an atomic operation.
117  * Otherwise, the data source task might be completed before the
118  * file tasks are scheduled, resulting in a potential false positive
119  * when another thread checks whether or not all the tasks for the
120  * job are completed.
121  */
122  this.scheduleDataSourceIngestTask(job);
123  this.scheduleFileIngestTasks(job, Collections.emptyList());
124  }
125  }
126 
132  synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) {
133  if (!job.isCancelled()) {
134  DataSourceIngestTask task = new DataSourceIngestTask(job);
135  try {
136  this.dataSourceIngestThreadQueue.putLast(task);
137  } catch (InterruptedException ex) {
138  IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", job.getId()), ex);
139  Thread.currentThread().interrupt();
140  }
141  }
142  }
143 
152  synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
153  if (!job.isCancelled()) {
154  Collection<AbstractFile> candidateFiles;
155  if (files.isEmpty()) {
156  candidateFiles = getTopLevelFiles(job.getDataSource());
157  } else {
158  candidateFiles = files;
159  }
160  for (AbstractFile file : candidateFiles) {
161  FileIngestTask task = new FileIngestTask(job, file);
162  if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
163  this.rootFileTaskQueue.add(task);
164  }
165  }
166  shuffleFileTaskQueues();
167  }
168  }
169 
178  synchronized void fastTrackFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
179  if (!job.isCancelled()) {
180  /*
181  * Put the files directly into the queue for the file ingest
182  * threads, if they pass the file filter for the job. The files are
183  * added to the queue for the ingest threads BEFORE the other queued
184  * tasks because the use case for this method is scheduling new
185  * carved or derived files from a higher priority task that is
186  * already in progress.
187  */
188  for (AbstractFile file : files) {
189  FileIngestTask fileTask = new FileIngestTask(job, file);
190  if (shouldEnqueueFileTask(fileTask)) {
191  try {
192  this.fileIngestThreadsQueue.putFirst(fileTask);
193  } catch (InterruptedException ex) {
194  IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex);
195  Thread.currentThread().interrupt();
196  return;
197  }
198  }
199  }
200  }
201  }
202 
209  synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
210  this.dataSourceIngestThreadQueue.taskCompleted(task);
211  }
212 
219  synchronized void notifyTaskCompleted(FileIngestTask task) {
220  this.fileIngestThreadsQueue.taskCompleted(task);
221  shuffleFileTaskQueues();
222  }
223 
232  synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) {
233  long jobId = job.getId();
234  return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
235  || hasTasksForJob(this.rootFileTaskQueue, jobId)
236  || hasTasksForJob(this.pendingFileTaskQueue, jobId)
237  || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
238  }
239 
247  synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) {
248  long jobId = job.getId();
249  IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId);
250  IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId);
251  }
252 
262  private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
263  List<AbstractFile> topLevelFiles = new ArrayList<>();
264  Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
265  if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
266  // The data source is itself a file to be processed.
267  topLevelFiles.add((AbstractFile) dataSource);
268  } else {
269  for (AbstractFile root : rootObjects) {
270  List<Content> children;
271  try {
272  children = root.getChildren();
273  if (children.isEmpty()) {
274  // Add the root object itself, it could be an unallocated
275  // space file, or a child of a volume or an image.
276  topLevelFiles.add(root);
277  } else {
278  // The root object is a file system root directory, get
279  // the files within it.
280  for (Content child : children) {
281  if (child instanceof AbstractFile) {
282  topLevelFiles.add((AbstractFile) child);
283  }
284  }
285  }
286  } catch (TskCoreException ex) {
287  logger.log(Level.WARNING, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS
288  }
289  }
290  }
291  return topLevelFiles;
292  }
293 
324  synchronized private void shuffleFileTaskQueues() {
325  while (this.fileIngestThreadsQueue.isEmpty()) {
326  /*
327  * If the pending file task queue is empty, move the highest
328  * priority root file task, if there is one, into it.
329  */
330  if (this.pendingFileTaskQueue.isEmpty()) {
331  final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
332  if (rootTask != null) {
333  this.pendingFileTaskQueue.addLast(rootTask);
334  }
335  }
336 
337  /*
338  * Try to move the next task from the pending task queue into the
339  * queue for the file ingest threads, if it passes the filter for
340  * the job.
341  */
342  final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
343  if (pendingTask == null) {
344  return;
345  }
346  if (shouldEnqueueFileTask(pendingTask)) {
347  try {
348  /*
349  * The task is added to the queue for the ingest threads
350  * AFTER the higher priority tasks that preceded it.
351  */
352  this.fileIngestThreadsQueue.putLast(pendingTask);
353  } catch (InterruptedException ex) {
354  IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
355  Thread.currentThread().interrupt();
356  return;
357  }
358  }
359 
360  /*
361  * If the task that was just queued for the file ingest threads has
362  * children, try to queue tasks for the children. Each child task
363  * will go into either the directory queue if it has children of its
364  * own, or into the queue for the file ingest threads, if it passes
365  * the filter for the job.
366  */
367  final AbstractFile file = pendingTask.getFile();
368  try {
369  for (Content child : file.getChildren()) {
370  if (child instanceof AbstractFile) {
371  AbstractFile childFile = (AbstractFile) child;
372  FileIngestTask childTask = new FileIngestTask(pendingTask.getIngestJob(), childFile);
373  if (childFile.hasChildren()) {
374  this.pendingFileTaskQueue.add(childTask);
375  } else if (shouldEnqueueFileTask(childTask)) {
376  try {
377  this.fileIngestThreadsQueue.putLast(childTask);
378  } catch (InterruptedException ex) {
379  IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
380  Thread.currentThread().interrupt();
381  return;
382  }
383  }
384  }
385  }
386  } catch (TskCoreException ex) {
387  logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS
388  }
389  }
390  }
391 
401  private static boolean shouldEnqueueFileTask(final FileIngestTask task) {
402  final AbstractFile file = task.getFile();
403 
404  // Skip the task if the file is actually the pseudo-file for the parent
405  // or current directory.
406  String fileName = file.getName();
407 
408  if (fileName.equals(".") || fileName.equals("..")) {
409  return false;
410  }
411 
412  /*
413  * Ensures that all directories, files which are members of the ingest
414  * file filter, and unallocated blocks (when processUnallocated is
415  * enabled) all continue to be processed. AbstractFiles which do not
416  * meet one of these criteria will be skipped.
417  *
418  * An additional check to see if unallocated space should be processed
419  * is part of the FilesSet.fileIsMemberOf() method.
420  *
421  * This code may need to be updated when
422  * TSK_DB_FILES_TYPE_ENUM.UNUSED_BLOCKS comes into use by Autopsy.
423  */
424  if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
425  return false;
426  }
427 
428  // Skip the task if the file is one of a select group of special, large
429  // NTFS or FAT file system files.
430  if (file instanceof org.sleuthkit.datamodel.File) {
431  final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file;
432 
433  // Get the type of the file system, if any, that owns the file.
434  TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
435  try {
436  FileSystem fs = f.getFileSystem();
437  if (fs != null) {
438  fsType = fs.getFsType();
439  }
440  } catch (TskCoreException ex) {
441  logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS
442  }
443 
444  // If the file system is not NTFS or FAT, don't skip the file.
445  if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
446  return true;
447  }
448 
449  // Find out whether the file is in a root directory.
450  boolean isInRootDir = false;
451  try {
452  AbstractFile parent = f.getParentDirectory();
453  isInRootDir = parent.isRoot();
454  } catch (TskCoreException ex) {
455  logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS
456  }
457 
458  // If the file is in the root directory of an NTFS or FAT file
459  // system, check its meta-address and check its name for the '$'
460  // character and a ':' character (not a default attribute).
461  if (isInRootDir && f.getMetaAddr() < 32) {
462  String name = f.getName();
463  if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) {
464  return false;
465  }
466  }
467  }
468 
469  return true;
470  }
471 
480  private static boolean shouldBeCarved(final FileIngestTask task) {
481  return task.getIngestJob().shouldProcessUnallocatedSpace() && task.getFile().getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
482  }
483 
492  private static boolean fileAcceptedByFilter(final FileIngestTask task) {
493  return !(task.getIngestJob().getFileIngestFilter().fileIsMemberOf(task.getFile()) == null);
494  }
495 
505  synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long jobId) {
506  for (IngestTask task : tasks) {
507  if (task.getIngestJob().getId() == jobId) {
508  return true;
509  }
510  }
511  return false;
512  }
513 
521  private static void removeTasksForJob(Collection<? extends IngestTask> tasks, long jobId) {
522  Iterator<? extends IngestTask> iterator = tasks.iterator();
523  while (iterator.hasNext()) {
524  IngestTask task = iterator.next();
525  if (task.getIngestJob().getId() == jobId) {
526  iterator.remove();
527  }
528  }
529  }
530 
539  private static int countTasksForJob(Collection<? extends IngestTask> queue, long jobId) {
540  int count = 0;
541  for (IngestTask task : queue) {
542  if (task.getIngestJob().getId() == jobId) {
543  count++;
544  }
545  }
546  return count;
547  }
548 
557  synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) {
558  return new IngestJobTasksSnapshot(jobId);
559  }
560 
565  private static class RootDirectoryTaskComparator implements Comparator<FileIngestTask> {
566 
567  @Override
568  public int compare(FileIngestTask q1, FileIngestTask q2) {
569  AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile());
570  AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile());
571  if (p1 == p2) {
572  return (int) (q2.getFile().getId() - q1.getFile().getId());
573  } else {
574  return p2.ordinal() - p1.ordinal();
575  }
576  }
577 
582  private static class AbstractFilePriority {
583 
585  }
586 
587  enum Priority {
588 
589  LAST, LOW, MEDIUM, HIGH
590  }
591 
592  static final List<Pattern> LAST_PRI_PATHS = new ArrayList<>();
593 
594  static final List<Pattern> LOW_PRI_PATHS = new ArrayList<>();
595 
596  static final List<Pattern> MEDIUM_PRI_PATHS = new ArrayList<>();
597 
598  static final List<Pattern> HIGH_PRI_PATHS = new ArrayList<>();
599 
600  /*
601  * prioritize root directory folders based on the assumption that we
602  * are looking for user content. Other types of investigations may
603  * want different priorities.
604  */
605  static /*
606  * prioritize root directory folders based on the assumption that we
607  * are looking for user content. Other types of investigations may
608  * want different priorities.
609  */ {
610  // these files have no structure, so they go last
611  //unalloc files are handled as virtual files in getPriority()
612  //LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE));
613  //LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE));
614  LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE));
615  LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE));
616  // orphan files are often corrupt and windows does not typically have
617  // user content, so put them towards the bottom
618  LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
619  LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE));
620  // all other files go into the medium category too
621  MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE));
622  // user content is top priority
623  HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE));
624  HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE));
625  HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE));
626  HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE));
627  }
628 
636  static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) {
637  if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
638  //quickly filter out unstructured content
639  //non-fs virtual files and dirs, such as representing unalloc space
640  return AbstractFilePriority.Priority.LAST;
641  }
642  //determine the fs files priority by name
643  final String path = abstractFile.getName();
644  if (path == null) {
645  return AbstractFilePriority.Priority.MEDIUM;
646  }
647  for (Pattern p : HIGH_PRI_PATHS) {
648  Matcher m = p.matcher(path);
649  if (m.find()) {
650  return AbstractFilePriority.Priority.HIGH;
651  }
652  }
653  for (Pattern p : MEDIUM_PRI_PATHS) {
654  Matcher m = p.matcher(path);
655  if (m.find()) {
656  return AbstractFilePriority.Priority.MEDIUM;
657  }
658  }
659  for (Pattern p : LOW_PRI_PATHS) {
660  Matcher m = p.matcher(path);
661  if (m.find()) {
662  return AbstractFilePriority.Priority.LOW;
663  }
664  }
665  for (Pattern p : LAST_PRI_PATHS) {
666  Matcher m = p.matcher(path);
667  if (m.find()) {
668  return AbstractFilePriority.Priority.LAST;
669  }
670  }
671  //default is medium
672  return AbstractFilePriority.Priority.MEDIUM;
673  }
674  }
675  }
676 
681  @ThreadSafe
682  private class IngestTaskTrackingQueue implements BlockingIngestTaskQueue {
683 
684  private final BlockingDeque<IngestTask> taskQueue = new LinkedBlockingDeque<>();
685  @GuardedBy("this")
686  private final List<IngestTask> queuedTasks = new LinkedList<>();
687  @GuardedBy("this")
688  private final List<IngestTask> tasksInProgress = new LinkedList<>();
689 
700  void putFirst(IngestTask task) throws InterruptedException {
701  synchronized (this) {
702  this.queuedTasks.add(task);
703  }
704  try {
705  this.taskQueue.putFirst(task);
706  } catch (InterruptedException ex) {
707  synchronized (this) {
708  this.queuedTasks.remove(task);
709  }
710  throw ex;
711  }
712  }
713 
724  void putLast(IngestTask task) throws InterruptedException {
725  synchronized (this) {
726  this.queuedTasks.add(task);
727  }
728  try {
729  this.taskQueue.putLast(task);
730  } catch (InterruptedException ex) {
731  synchronized (this) {
732  this.queuedTasks.remove(task);
733  }
734  throw ex;
735  }
736  }
737 
748  @Override
749  public IngestTask getNextTask() throws InterruptedException {
750  IngestTask task = taskQueue.takeFirst();
751  synchronized (this) {
752  this.queuedTasks.remove(task);
753  this.tasksInProgress.add(task);
754  }
755  return task;
756  }
757 
763  boolean isEmpty() {
764  synchronized (this) {
765  return this.queuedTasks.isEmpty();
766  }
767  }
768 
775  void taskCompleted(IngestTask task) {
776  synchronized (this) {
777  this.tasksInProgress.remove(task);
778  }
779  }
780 
789  boolean hasTasksForJob(long jobId) {
790  synchronized (this) {
791  return IngestTasksScheduler.hasTasksForJob(this.queuedTasks, jobId) || IngestTasksScheduler.hasTasksForJob(this.tasksInProgress, jobId);
792  }
793  }
794 
803  int countQueuedTasksForJob(long jobId) {
804  synchronized (this) {
805  return IngestTasksScheduler.countTasksForJob(this.queuedTasks, jobId);
806  }
807  }
808 
817  int countRunningTasksForJob(long jobId) {
818  synchronized (this) {
819  return IngestTasksScheduler.countTasksForJob(this.tasksInProgress, jobId);
820  }
821  }
822 
823  }
824 
828  class IngestJobTasksSnapshot {
829 
830  private final long jobId;
831  private final long dsQueueSize;
832  private final long rootQueueSize;
833  private final long dirQueueSize;
834  private final long fileQueueSize;
835  private final long runningListSize;
836 
842  IngestJobTasksSnapshot(long jobId) {
843  this.jobId = jobId;
844  this.dsQueueSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId);
845  this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId);
846  this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTaskQueue, jobId);
847  this.fileQueueSize = IngestTasksScheduler.this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId);;
848  this.runningListSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + IngestTasksScheduler.this.fileIngestThreadsQueue.countRunningTasksForJob(jobId);
849  }
850 
857  long getJobId() {
858  return jobId;
859  }
860 
867  long getRootQueueSize() {
868  return rootQueueSize;
869  }
870 
877  long getDirectoryTasksQueueSize() {
878  return dirQueueSize;
879  }
880 
881  long getFileQueueSize() {
882  return fileQueueSize;
883  }
884 
885  long getDsQueueSize() {
886  return dsQueueSize;
887  }
888 
889  long getRunningListSize() {
890  return runningListSize;
891  }
892 
893  }
894 
895 }

Copyright © 2012-2016 Basis Technology. Generated on: Mon May 7 2018
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.