19 package org.sleuthkit.autopsy.ingest;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.Deque;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.TreeSet;
30 import java.util.concurrent.BlockingDeque;
31 import java.util.concurrent.LinkedBlockingDeque;
32 import java.util.logging.Level;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35 import javax.annotation.concurrent.GuardedBy;
36 import javax.annotation.concurrent.ThreadSafe;
49 final class IngestTasksScheduler {
51 private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
52 private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
53 @GuardedBy(
"IngestTasksScheduler.this")
54 private static IngestTasksScheduler instance;
55 private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
57 private final TreeSet<FileIngestTask> rootFileTaskQueue;
59 private final Deque<FileIngestTask> pendingFileTaskQueue;
60 private final IngestTaskTrackingQueue fileIngestThreadsQueue;
67 synchronized static IngestTasksScheduler getInstance() {
68 if (IngestTasksScheduler.instance == null) {
69 IngestTasksScheduler.instance =
new IngestTasksScheduler();
71 return IngestTasksScheduler.instance;
79 private IngestTasksScheduler() {
80 this.dataSourceIngestThreadQueue =
new IngestTaskTrackingQueue();
81 this.rootFileTaskQueue =
new TreeSet<>(
new RootDirectoryTaskComparator());
82 this.pendingFileTaskQueue =
new LinkedList<>();
83 this.fileIngestThreadsQueue =
new IngestTaskTrackingQueue();
92 BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
93 return this.dataSourceIngestThreadQueue;
102 BlockingIngestTaskQueue getFileIngestTaskQueue() {
103 return this.fileIngestThreadsQueue;
112 synchronized void scheduleIngestTasks(DataSourceIngestJob job) {
113 if (!job.isCancelled()) {
122 this.scheduleDataSourceIngestTask(job);
123 this.scheduleFileIngestTasks(job, Collections.emptyList());
132 synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) {
133 if (!job.isCancelled()) {
134 DataSourceIngestTask task =
new DataSourceIngestTask(job);
136 this.dataSourceIngestThreadQueue.putLast(task);
137 }
catch (InterruptedException ex) {
138 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", job.getId()), ex);
139 Thread.currentThread().interrupt();
152 synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
153 if (!job.isCancelled()) {
154 Collection<AbstractFile> candidateFiles;
155 if (files.isEmpty()) {
156 candidateFiles = getTopLevelFiles(job.getDataSource());
158 candidateFiles = files;
160 for (AbstractFile file : candidateFiles) {
161 FileIngestTask task =
new FileIngestTask(job, file);
162 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
163 this.rootFileTaskQueue.add(task);
166 shuffleFileTaskQueues();
178 synchronized void fastTrackFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
179 if (!job.isCancelled()) {
188 for (AbstractFile file : files) {
189 FileIngestTask fileTask =
new FileIngestTask(job, file);
190 if (shouldEnqueueFileTask(fileTask)) {
192 this.fileIngestThreadsQueue.putFirst(fileTask);
193 }
catch (InterruptedException ex) {
194 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex);
195 Thread.currentThread().interrupt();
209 synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
210 this.dataSourceIngestThreadQueue.taskCompleted(task);
219 synchronized void notifyTaskCompleted(FileIngestTask task) {
220 this.fileIngestThreadsQueue.taskCompleted(task);
221 shuffleFileTaskQueues();
232 synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) {
233 long jobId = job.getId();
234 return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
235 || hasTasksForJob(this.rootFileTaskQueue, jobId)
236 || hasTasksForJob(this.pendingFileTaskQueue, jobId)
237 || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
247 synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) {
248 long jobId = job.getId();
249 IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId);
250 IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId);
262 private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
263 List<AbstractFile> topLevelFiles =
new ArrayList<>();
264 Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
265 if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
267 topLevelFiles.add((AbstractFile) dataSource);
269 for (AbstractFile root : rootObjects) {
270 List<Content> children;
272 children = root.getChildren();
273 if (children.isEmpty()) {
276 topLevelFiles.add(root);
280 for (Content child : children) {
281 if (child instanceof AbstractFile) {
282 topLevelFiles.add((AbstractFile) child);
286 }
catch (TskCoreException ex) {
287 logger.log(Level.WARNING,
"Could not get children of root to enqueue: " + root.getId() +
": " + root.getName(), ex);
291 return topLevelFiles;
324 synchronized private void shuffleFileTaskQueues() {
325 while (this.fileIngestThreadsQueue.isEmpty()) {
330 if (this.pendingFileTaskQueue.isEmpty()) {
331 final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
332 if (rootTask != null) {
333 this.pendingFileTaskQueue.addLast(rootTask);
342 final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
343 if (pendingTask == null) {
346 if (shouldEnqueueFileTask(pendingTask)) {
352 this.fileIngestThreadsQueue.putLast(pendingTask);
353 }
catch (InterruptedException ex) {
354 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
355 Thread.currentThread().interrupt();
367 final AbstractFile file = pendingTask.getFile();
369 for (Content child : file.getChildren()) {
370 if (child instanceof AbstractFile) {
371 AbstractFile childFile = (AbstractFile) child;
372 FileIngestTask childTask =
new FileIngestTask(pendingTask.getIngestJob(), childFile);
373 if (childFile.hasChildren()) {
374 this.pendingFileTaskQueue.add(childTask);
375 }
else if (shouldEnqueueFileTask(childTask)) {
377 this.fileIngestThreadsQueue.putLast(childTask);
378 }
catch (InterruptedException ex) {
379 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
380 Thread.currentThread().interrupt();
386 }
catch (TskCoreException ex) {
387 logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex);
401 private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
402 final AbstractFile file = task.getFile();
406 String fileName = file.getName();
408 if (fileName.equals(
".") || fileName.equals(
"..")) {
424 if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
434 TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
436 FileSystem fs = f.getFileSystem();
438 fsType = fs.getFsType();
440 }
catch (TskCoreException ex) {
441 logger.log(Level.SEVERE,
"Error querying file system for " + f, ex);
445 if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
450 boolean isInRootDir =
false;
452 AbstractFile parent = f.getParentDirectory();
453 isInRootDir = parent.isRoot();
454 }
catch (TskCoreException ex) {
455 logger.log(Level.WARNING,
"Error querying parent directory for" + f.getName(), ex);
461 if (isInRootDir && f.getMetaAddr() < 32) {
462 String name = f.getName();
463 if (name.length() > 0 && name.charAt(0) ==
'$' && name.contains(
":")) {
480 private static boolean shouldBeCarved(
final FileIngestTask task) {
481 return task.getIngestJob().shouldProcessUnallocatedSpace() && task.getFile().getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
492 private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
493 return !(task.getIngestJob().getFileIngestFilter().fileIsMemberOf(task.getFile()) == null);
505 synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks,
long jobId) {
506 for (IngestTask task : tasks) {
507 if (task.getIngestJob().getId() == jobId) {
521 private static void removeTasksForJob(Collection<? extends IngestTask> tasks,
long jobId) {
522 Iterator<? extends IngestTask> iterator = tasks.iterator();
523 while (iterator.hasNext()) {
524 IngestTask task = iterator.next();
525 if (task.getIngestJob().getId() == jobId) {
539 private static int countTasksForJob(Collection<? extends IngestTask> queue,
long jobId) {
541 for (IngestTask task : queue) {
542 if (task.getIngestJob().getId() == jobId) {
557 synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(
long jobId) {
558 return new IngestJobTasksSnapshot(jobId);
568 public int compare(FileIngestTask q1, FileIngestTask q2) {
572 return (
int) (q2.getFile().getId() - q1.getFile().getId());
574 return p2.ordinal() - p1.ordinal();
589 LAST, LOW, MEDIUM, HIGH
592 static final List<Pattern> LAST_PRI_PATHS =
new ArrayList<>();
594 static final List<Pattern> LOW_PRI_PATHS =
new ArrayList<>();
596 static final List<Pattern> MEDIUM_PRI_PATHS =
new ArrayList<>();
598 static final List<Pattern> HIGH_PRI_PATHS =
new ArrayList<>();
614 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
615 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
618 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
619 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
621 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
623 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
624 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
625 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
626 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
637 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
643 final String path = abstractFile.getName();
647 for (Pattern p : HIGH_PRI_PATHS) {
648 Matcher m = p.matcher(path);
653 for (Pattern p : MEDIUM_PRI_PATHS) {
654 Matcher m = p.matcher(path);
659 for (Pattern p : LOW_PRI_PATHS) {
660 Matcher m = p.matcher(path);
665 for (Pattern p : LAST_PRI_PATHS) {
666 Matcher m = p.matcher(path);
684 private final BlockingDeque<IngestTask>
taskQueue =
new LinkedBlockingDeque<>();
700 void putFirst(IngestTask task) throws InterruptedException {
701 synchronized (
this) {
705 this.taskQueue.putFirst(task);
706 }
catch (InterruptedException ex) {
707 synchronized (
this) {
724 void putLast(IngestTask task)
throws InterruptedException {
725 synchronized (
this) {
729 this.taskQueue.putLast(task);
730 }
catch (InterruptedException ex) {
731 synchronized (
this) {
750 IngestTask task = taskQueue.takeFirst();
751 synchronized (
this) {
764 synchronized (
this) {
775 void taskCompleted(IngestTask task) {
776 synchronized (
this) {
789 boolean hasTasksForJob(
long jobId) {
790 synchronized (
this) {
791 return IngestTasksScheduler.hasTasksForJob(this.
queuedTasks, jobId) || IngestTasksScheduler.hasTasksForJob(this.
tasksInProgress, jobId);
803 int countQueuedTasksForJob(
long jobId) {
804 synchronized (
this) {
805 return IngestTasksScheduler.countTasksForJob(this.
queuedTasks, jobId);
817 int countRunningTasksForJob(
long jobId) {
818 synchronized (
this) {
819 return IngestTasksScheduler.countTasksForJob(this.
tasksInProgress, jobId);
828 class IngestJobTasksSnapshot {
830 private final long jobId;
831 private final long dsQueueSize;
832 private final long rootQueueSize;
833 private final long dirQueueSize;
834 private final long fileQueueSize;
835 private final long runningListSize;
842 IngestJobTasksSnapshot(
long jobId) {
844 this.dsQueueSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId);
845 this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId);
846 this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTaskQueue, jobId);
847 this.fileQueueSize = IngestTasksScheduler.this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId);;
848 this.runningListSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + IngestTasksScheduler.this.fileIngestThreadsQueue.countRunningTasksForJob(jobId);
867 long getRootQueueSize() {
868 return rootQueueSize;
877 long getDirectoryTasksQueueSize() {
881 long getFileQueueSize() {
882 return fileQueueSize;
885 long getDsQueueSize() {
889 long getRunningListSize() {
890 return runningListSize;
final List< IngestTask > queuedTasks
final BlockingDeque< IngestTask > taskQueue
final List< IngestTask > tasksInProgress
int compare(FileIngestTask q1, FileIngestTask q2)