Autopsy  4.17.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestTasksScheduler.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2018 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.Deque;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.TreeSet;
32 import java.util.concurrent.BlockingDeque;
33 import java.util.concurrent.LinkedBlockingDeque;
34 import java.util.logging.Level;
35 import java.util.regex.Matcher;
36 import java.util.regex.Pattern;
37 import javax.annotation.concurrent.GuardedBy;
38 import javax.annotation.concurrent.ThreadSafe;
40 import org.sleuthkit.datamodel.AbstractFile;
41 import org.sleuthkit.datamodel.Content;
42 import org.sleuthkit.datamodel.FileSystem;
43 import org.sleuthkit.datamodel.TskCoreException;
44 import org.sleuthkit.datamodel.TskData;
45 
50 @ThreadSafe
51 final class IngestTasksScheduler {
52 
53  private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
54  private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
55  @GuardedBy("IngestTasksScheduler.this")
56  private static IngestTasksScheduler instance;
57  private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
58  @GuardedBy("this")
59  private final TreeSet<FileIngestTask> rootFileTaskQueue;
60  @GuardedBy("this")
61  private final Deque<FileIngestTask> pendingFileTaskQueue;
62  @GuardedBy("this")
63  private final Queue<FileIngestTask> streamedTasksQueue;
64  private final IngestTaskTrackingQueue fileIngestThreadsQueue;
65 
71  synchronized static IngestTasksScheduler getInstance() {
72  if (IngestTasksScheduler.instance == null) {
73  IngestTasksScheduler.instance = new IngestTasksScheduler();
74  }
75  return IngestTasksScheduler.instance;
76  }
77 
83  private IngestTasksScheduler() {
84  this.dataSourceIngestThreadQueue = new IngestTaskTrackingQueue();
85  this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator());
86  this.pendingFileTaskQueue = new LinkedList<>();
87  this.fileIngestThreadsQueue = new IngestTaskTrackingQueue();
88  this.streamedTasksQueue = new LinkedList<>();
89  }
90 
97  BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
98  return this.dataSourceIngestThreadQueue;
99  }
100 
107  BlockingIngestTaskQueue getFileIngestTaskQueue() {
108  return this.fileIngestThreadsQueue;
109  }
110 
117  synchronized void scheduleIngestTasks(IngestJobPipeline ingestJobPipeline) {
118  if (!ingestJobPipeline.isCancelled()) {
119  /*
120  * Scheduling of both the data source ingest task and the initial
121  * file ingest tasks for an ingestJobPipeline must be an atomic operation.
122  * Otherwise, the data source task might be completed before the
123  * file tasks are scheduled, resulting in a potential false positive
124  * when another thread checks whether or not all the tasks for the
125  * ingestJobPipeline are completed.
126  */
127  this.scheduleDataSourceIngestTask(ingestJobPipeline);
128  this.scheduleFileIngestTasks(ingestJobPipeline, Collections.emptyList());
129  }
130  }
131 
137  synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestJobPipeline) {
138  if (!ingestJobPipeline.isCancelled()) {
139  DataSourceIngestTask task = new DataSourceIngestTask(ingestJobPipeline);
140  try {
141  this.dataSourceIngestThreadQueue.putLast(task);
142  } catch (InterruptedException ex) {
143  IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", ingestJobPipeline.getId()), ex);
144  Thread.currentThread().interrupt();
145  }
146  }
147  }
148 
157  synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
158  if (!ingestJobPipeline.isCancelled()) {
159  Collection<AbstractFile> candidateFiles;
160  if (files.isEmpty()) {
161  candidateFiles = getTopLevelFiles(ingestJobPipeline.getDataSource());
162  } else {
163  candidateFiles = files;
164  }
165  for (AbstractFile file : candidateFiles) {
166  FileIngestTask task = new FileIngestTask(ingestJobPipeline, file);
167  if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
168  this.rootFileTaskQueue.add(task);
169  }
170  }
171  refillIngestThreadQueue();
172  }
173  }
174 
182  synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestJobPipeline, List<Long> fileIds) {
183  if (!ingestJobPipeline.isCancelled()) {
184  for (long id : fileIds) {
185  // Create the file ingest task. Note that we do not do the shouldEnqueueFileTask()
186  // check here in order to delay loading the AbstractFile object.
187  FileIngestTask task = new FileIngestTask(ingestJobPipeline, id);
188  this.streamedTasksQueue.add(task);
189  }
190  refillIngestThreadQueue();
191  }
192  }
193 
202  synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
203  if (!ingestJobPipeline.isCancelled()) {
204  /*
205  * Put the files directly into the queue for the file ingest
206  * threads, if they pass the file filter for the job. The files are
207  * added to the queue for the ingest threads BEFORE the other queued
208  * tasks because the use case for this method is scheduling new
209  * carved or derived files from a higher priority task that is
210  * already in progress.
211  */
212  for (AbstractFile file : files) {
213  FileIngestTask fileTask = new FileIngestTask(ingestJobPipeline, file);
214  if (shouldEnqueueFileTask(fileTask)) {
215  try {
216  this.fileIngestThreadsQueue.putFirst(fileTask);
217  } catch (InterruptedException ex) {
218  IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", ingestJobPipeline.getId()), ex);
219  Thread.currentThread().interrupt();
220  return;
221  }
222  }
223  }
224  }
225  }
226 
233  synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
234  this.dataSourceIngestThreadQueue.taskCompleted(task);
235  }
236 
243  synchronized void notifyTaskCompleted(FileIngestTask task) {
244  this.fileIngestThreadsQueue.taskCompleted(task);
245  refillIngestThreadQueue();
246  }
247 
256  synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestJobPipeline) {
257  long jobId = ingestJobPipeline.getId();
258 
259  return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
260  || hasTasksForJob(this.rootFileTaskQueue, jobId)
261  || hasTasksForJob(this.pendingFileTaskQueue, jobId)
262  || hasTasksForJob(this.streamedTasksQueue, jobId)
263  || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
264  }
265 
273  synchronized void cancelPendingTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
274  long jobId = ingestJobPipeline.getId();
275  IngestTasksScheduler.removeTasksForJob(rootFileTaskQueue, jobId);
276  IngestTasksScheduler.removeTasksForJob(pendingFileTaskQueue, jobId);
277  IngestTasksScheduler.removeTasksForJob(streamedTasksQueue, jobId);
278  }
279 
289  private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
290  List<AbstractFile> topLevelFiles = new ArrayList<>();
291  Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
292  if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
293  // The data source is itself a file to be processed.
294  topLevelFiles.add((AbstractFile) dataSource);
295  } else {
296  for (AbstractFile root : rootObjects) {
297  List<Content> children;
298  try {
299  children = root.getChildren();
300  if (children.isEmpty()) {
301  // Add the root object itself, it could be an unallocated
302  // space file, or a child of a volume or an image.
303  topLevelFiles.add(root);
304  } else {
305  // The root object is a file system root directory, get
306  // the files within it.
307  for (Content child : children) {
308  if (child instanceof AbstractFile) {
309  topLevelFiles.add((AbstractFile) child);
310  }
311  }
312  }
313  } catch (TskCoreException ex) {
314  logger.log(Level.WARNING, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS
315  }
316  }
317  }
318  return topLevelFiles;
319  }
320 
325  synchronized private void refillIngestThreadQueue() {
326  try {
327  takeFromStreamingTaskQueue();
328  takeFromBatchTasksQueues();
329  } catch (InterruptedException ex) {
330  IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
331  Thread.currentThread().interrupt();
332  }
333  }
334 
339  synchronized private void takeFromStreamingTaskQueue() throws InterruptedException {
340  /*
341  * Schedule files from the streamedTasksQueue
342  */
343  while (fileIngestThreadsQueue.isEmpty()) {
344  /*
345  * We will attempt to schedule as many tasks as there are ingest
346  * queues.
347  */
348  int taskCount = 0;
349  while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
350  final FileIngestTask streamingTask = streamedTasksQueue.poll();
351  if (streamingTask == null) {
352  return; // No streaming tasks are queued right now
353  }
354 
355  if (shouldEnqueueFileTask(streamingTask)) {
356  fileIngestThreadsQueue.putLast(streamingTask);
357  taskCount++;
358  }
359  }
360  }
361  }
362 
393  synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
394 
395  while (this.fileIngestThreadsQueue.isEmpty()) {
396  /*
397  * If the pending file task queue is empty, move the highest
398  * priority root file task, if there is one, into it.
399  */
400  if (this.pendingFileTaskQueue.isEmpty()) {
401  final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
402  if (rootTask != null) {
403  this.pendingFileTaskQueue.addLast(rootTask);
404  }
405  }
406 
407  /*
408  * Try to move the next task from the pending task queue into the
409  * queue for the file ingest threads, if it passes the filter for
410  * the job.
411  */
412  final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
413  if (pendingTask == null) {
414  return;
415  }
416  if (shouldEnqueueFileTask(pendingTask)) {
417  /*
418  * The task is added to the queue for the ingest threads
419  * AFTER the higher priority tasks that preceded it.
420  */
421  this.fileIngestThreadsQueue.putLast(pendingTask);
422  }
423 
424  /*
425  * If the task that was just queued for the file ingest threads has
426  * children, try to queue tasks for the children. Each child task
427  * will go into either the directory queue if it has children of its
428  * own, or into the queue for the file ingest threads, if it passes
429  * the filter for the job.
430  */
431  AbstractFile file = null;
432  try {
433  file = pendingTask.getFile();
434  for (Content child : file.getChildren()) {
435  if (child instanceof AbstractFile) {
436  AbstractFile childFile = (AbstractFile) child;
437  FileIngestTask childTask = new FileIngestTask(pendingTask.getIngestJobPipeline(), childFile);
438  if (childFile.hasChildren()) {
439  this.pendingFileTaskQueue.add(childTask);
440  } else if (shouldEnqueueFileTask(childTask)) {
441  this.fileIngestThreadsQueue.putLast(childTask);
442  }
443  }
444  }
445  } catch (TskCoreException ex) {
446  if (file != null) {
447  logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS
448  } else {
449  // In practice, the task would have already returned false from the call to shouldEnqueueFileTask()
450  logger.log(Level.SEVERE, "Error loading file with object ID {0}", pendingTask.getFileId());
451  }
452  }
453  }
454  }
455 
465  private static boolean shouldEnqueueFileTask(final FileIngestTask task) {
466  AbstractFile file;
467  try {
468  file = task.getFile();
469  } catch (TskCoreException ex) {
470  logger.log(Level.SEVERE, "Error loading file with ID {0}", task.getFileId());
471  return false;
472  }
473 
474  // Skip the task if the file is actually the pseudo-file for the parent
475  // or current directory.
476  String fileName = file.getName();
477 
478  if (fileName.equals(".") || fileName.equals("..")) {
479  return false;
480  }
481 
482  /*
483  * Ensures that all directories, files which are members of the ingest
484  * file filter, and unallocated blocks (when processUnallocated is
485  * enabled) all continue to be processed. AbstractFiles which do not
486  * meet one of these criteria will be skipped.
487  *
488  * An additional check to see if unallocated space should be processed
489  * is part of the FilesSet.fileIsMemberOf() method.
490  *
491  * This code may need to be updated when
492  * TSK_DB_FILES_TYPE_ENUM.UNUSED_BLOCKS comes into use by Autopsy.
493  */
494  if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
495  return false;
496  }
497 
498  // Skip the task if the file is one of a select group of special, large
499  // NTFS or FAT file system files.
500  if (file instanceof org.sleuthkit.datamodel.File) {
501  final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file;
502 
503  // Get the type of the file system, if any, that owns the file.
504  TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
505  try {
506  FileSystem fs = f.getFileSystem();
507  if (fs != null) {
508  fsType = fs.getFsType();
509  }
510  } catch (TskCoreException ex) {
511  logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS
512  }
513 
514  // If the file system is not NTFS or FAT, don't skip the file.
515  if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
516  return true;
517  }
518 
519  // Find out whether the file is in a root directory.
520  boolean isInRootDir = false;
521  try {
522  AbstractFile parent = f.getParentDirectory();
523  if (parent == null) {
524  isInRootDir = true;
525  } else {
526  isInRootDir = parent.isRoot();
527  }
528  } catch (TskCoreException ex) {
529  logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS
530  }
531 
532  // If the file is in the root directory of an NTFS or FAT file
533  // system, check its meta-address and check its name for the '$'
534  // character and a ':' character (not a default attribute).
535  if (isInRootDir && f.getMetaAddr() < 32) {
536  String name = f.getName();
537  if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) {
538  return false;
539  }
540  }
541  }
542 
543  return true;
544  }
545 
554  private static boolean shouldBeCarved(final FileIngestTask task) {
555  try {
556  AbstractFile file = task.getFile();
557  return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
558  } catch (TskCoreException ex) {
559  return false;
560  }
561  }
562 
571  private static boolean fileAcceptedByFilter(final FileIngestTask task) {
572  try {
573  AbstractFile file = task.getFile();
574  return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
575  } catch (TskCoreException ex) {
576  return false;
577  }
578  }
579 
589  synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long jobId) {
590  for (IngestTask task : tasks) {
591  if (task.getIngestJobPipeline().getId() == jobId) {
592  return true;
593  }
594  }
595  return false;
596  }
597 
605  private static void removeTasksForJob(Collection<? extends IngestTask> tasks, long jobId) {
606  Iterator<? extends IngestTask> iterator = tasks.iterator();
607  while (iterator.hasNext()) {
608  IngestTask task = iterator.next();
609  if (task.getIngestJobPipeline().getId() == jobId) {
610  iterator.remove();
611  }
612  }
613  }
614 
623  private static int countTasksForJob(Collection<? extends IngestTask> queue, long jobId) {
624  int count = 0;
625  for (IngestTask task : queue) {
626  if (task.getIngestJobPipeline().getId() == jobId) {
627  count++;
628  }
629  }
630  return count;
631  }
632 
641  synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) {
642  return new IngestJobTasksSnapshot(jobId, this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId),
643  countTasksForJob(this.rootFileTaskQueue, jobId),
644  countTasksForJob(this.pendingFileTaskQueue, jobId),
645  this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId),
646  this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + this.fileIngestThreadsQueue.countRunningTasksForJob(jobId),
647  countTasksForJob(this.streamedTasksQueue, jobId));
648  }
649 
654  private static class RootDirectoryTaskComparator implements Comparator<FileIngestTask> {
655 
656  @Override
657  public int compare(FileIngestTask q1, FileIngestTask q2) {
658  // In practice the case where one or both calls to getFile() fails
659  // should never occur since such tasks would not be added to the queue.
660  AbstractFile file1 = null;
661  AbstractFile file2 = null;
662  try {
663  file1 = q1.getFile();
664  } catch (TskCoreException ex) {
665  // Do nothing - the exception has been logged elsewhere
666  }
667 
668  try {
669  file2 = q2.getFile();
670  } catch (TskCoreException ex) {
671  // Do nothing - the exception has been logged elsewhere
672  }
673 
674  if (file1 == null) {
675  if (file2 == null) {
676  return (int) (q2.getFileId() - q1.getFileId());
677  } else {
678  return 1;
679  }
680  } else if (file2 == null) {
681  return -1;
682  }
683 
684  AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(file1);
685  AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(file2);
686  if (p1 == p2) {
687  return (int) (file2.getId() - file1.getId());
688  } else {
689  return p2.ordinal() - p1.ordinal();
690  }
691  }
692 
697  private static class AbstractFilePriority {
698 
700  }
701 
702  enum Priority {
703 
704  LAST, LOW, MEDIUM, HIGH
705  }
706 
707  static final List<Pattern> LAST_PRI_PATHS = new ArrayList<>();
708 
709  static final List<Pattern> LOW_PRI_PATHS = new ArrayList<>();
710 
711  static final List<Pattern> MEDIUM_PRI_PATHS = new ArrayList<>();
712 
713  static final List<Pattern> HIGH_PRI_PATHS = new ArrayList<>();
714 
715  /*
716  * prioritize root directory folders based on the assumption that we
717  * are looking for user content. Other types of investigations may
718  * want different priorities.
719  */
720  static /*
721  * prioritize root directory folders based on the assumption that we
722  * are looking for user content. Other types of investigations may
723  * want different priorities.
724  */ {
725  // these files have no structure, so they go last
726  //unalloc files are handled as virtual files in getPriority()
727  //LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE));
728  //LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE));
729  LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE));
730  LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE));
731  // orphan files are often corrupt and windows does not typically have
732  // user content, so put them towards the bottom
733  LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
734  LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE));
735  // all other files go into the medium category too
736  MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE));
737  // user content is top priority
738  HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE));
739  HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE));
740  HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE));
741  HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE));
742  }
743 
751  static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) {
752  if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
753  //quickly filter out unstructured content
754  //non-fs virtual files and dirs, such as representing unalloc space
755  return AbstractFilePriority.Priority.LAST;
756  }
757  //determine the fs files priority by name
758  final String path = abstractFile.getName();
759  if (path == null) {
760  return AbstractFilePriority.Priority.MEDIUM;
761  }
762  for (Pattern p : HIGH_PRI_PATHS) {
763  Matcher m = p.matcher(path);
764  if (m.find()) {
765  return AbstractFilePriority.Priority.HIGH;
766  }
767  }
768  for (Pattern p : MEDIUM_PRI_PATHS) {
769  Matcher m = p.matcher(path);
770  if (m.find()) {
771  return AbstractFilePriority.Priority.MEDIUM;
772  }
773  }
774  for (Pattern p : LOW_PRI_PATHS) {
775  Matcher m = p.matcher(path);
776  if (m.find()) {
777  return AbstractFilePriority.Priority.LOW;
778  }
779  }
780  for (Pattern p : LAST_PRI_PATHS) {
781  Matcher m = p.matcher(path);
782  if (m.find()) {
783  return AbstractFilePriority.Priority.LAST;
784  }
785  }
786  //default is medium
787  return AbstractFilePriority.Priority.MEDIUM;
788  }
789  }
790  }
791 
796  @ThreadSafe
797  private class IngestTaskTrackingQueue implements BlockingIngestTaskQueue {
798 
799  private final BlockingDeque<IngestTask> taskQueue = new LinkedBlockingDeque<>();
800  @GuardedBy("this")
801  private final List<IngestTask> queuedTasks = new LinkedList<>();
802  @GuardedBy("this")
803  private final List<IngestTask> tasksInProgress = new LinkedList<>();
804 
815  void putFirst(IngestTask task) throws InterruptedException {
816  synchronized (this) {
817  this.queuedTasks.add(task);
818  }
819  try {
820  this.taskQueue.putFirst(task);
821  } catch (InterruptedException ex) {
822  synchronized (this) {
823  this.queuedTasks.remove(task);
824  }
825  throw ex;
826  }
827  }
828 
839  void putLast(IngestTask task) throws InterruptedException {
840  synchronized (this) {
841  this.queuedTasks.add(task);
842  }
843  try {
844  this.taskQueue.putLast(task);
845  } catch (InterruptedException ex) {
846  synchronized (this) {
847  this.queuedTasks.remove(task);
848  }
849  throw ex;
850  }
851  }
852 
863  @Override
864  public IngestTask getNextTask() throws InterruptedException {
865  IngestTask task = taskQueue.takeFirst();
866  synchronized (this) {
867  this.queuedTasks.remove(task);
868  this.tasksInProgress.add(task);
869  }
870  return task;
871  }
872 
878  boolean isEmpty() {
879  synchronized (this) {
880  return this.queuedTasks.isEmpty();
881  }
882  }
883 
890  void taskCompleted(IngestTask task) {
891  synchronized (this) {
892  this.tasksInProgress.remove(task);
893  }
894  }
895 
904  boolean hasTasksForJob(long jobId) {
905  synchronized (this) {
906  return IngestTasksScheduler.hasTasksForJob(this.queuedTasks, jobId) || IngestTasksScheduler.hasTasksForJob(this.tasksInProgress, jobId);
907  }
908  }
909 
918  int countQueuedTasksForJob(long jobId) {
919  synchronized (this) {
920  return IngestTasksScheduler.countTasksForJob(this.queuedTasks, jobId);
921  }
922  }
923 
932  int countRunningTasksForJob(long jobId) {
933  synchronized (this) {
934  return IngestTasksScheduler.countTasksForJob(this.tasksInProgress, jobId);
935  }
936  }
937 
938  }
939 
943  static final class IngestJobTasksSnapshot implements Serializable {
944 
945  private static final long serialVersionUID = 1L;
946  private final long jobId;
947  private final long dsQueueSize;
948  private final long rootQueueSize;
949  private final long dirQueueSize;
950  private final long fileQueueSize;
951  private final long runningListSize;
952  private final long streamingQueueSize;
953 
959  IngestJobTasksSnapshot(long jobId, long dsQueueSize, long rootQueueSize, long dirQueueSize, long fileQueueSize,
960  long runningListSize, long streamingQueueSize) {
961  this.jobId = jobId;
962  this.dsQueueSize = dsQueueSize;
963  this.rootQueueSize = rootQueueSize;
964  this.dirQueueSize = dirQueueSize;
965  this.fileQueueSize = fileQueueSize;
966  this.runningListSize = runningListSize;
967  this.streamingQueueSize = streamingQueueSize;
968  }
969 
976  long getJobId() {
977  return jobId;
978  }
979 
986  long getRootQueueSize() {
987  return rootQueueSize;
988  }
989 
996  long getDirectoryTasksQueueSize() {
997  return dirQueueSize;
998  }
999 
1000  long getFileQueueSize() {
1001  return fileQueueSize;
1002  }
1003 
1004  long getStreamingQueueSize() {
1005  return streamingQueueSize;
1006  }
1007 
1008  long getDsQueueSize() {
1009  return dsQueueSize;
1010  }
1011 
1012  long getRunningListSize() {
1013  return runningListSize;
1014  }
1015 
1016  }
1017 
1018 }

Copyright © 2012-2021 Basis Technology. Generated on: Tue Jan 19 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.