Autopsy 4.22.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestPipeline.java
Go to the documentation of this file.
1/*
2 * Autopsy Forensic Browser
3 *
4 * Copyright 2021 Basis Technology Corp.
5 * Contact: carrier <at> sleuthkit <dot> org
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19package org.sleuthkit.autopsy.ingest;
20
21import static java.lang.Thread.sleep;
22import java.time.DayOfWeek;
23import java.time.LocalDateTime;
24import java.time.temporal.ChronoUnit;
25import java.util.ArrayList;
26import java.util.Date;
27import java.util.List;
28import java.util.Optional;
29import java.util.concurrent.TimeUnit;
30import java.util.logging.Level;
31import org.openide.util.NbBundle;
32import org.sleuthkit.autopsy.coreutils.Logger;
33import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
34
43abstract class IngestPipeline<T extends IngestTask> {
44
45 /*
46 * NOTE ON MULTI-THREADING POLICY: This class is primarily designed for use
47 * by one thread at a time. There are a few status fields that are volatile
48 * to ensure visibility to threads making ingest progress snapshots, but
49 * methods such as startUp(), performTask() and shutDown() are not
50 * synchronized.
51 */
52 private static final Logger logger = Logger.getLogger(IngestPipeline.class.getName());
53 private final IngestJobExecutor ingestJobExecutor;
54 private final List<IngestModuleTemplate> moduleTemplates;
55 private final List<PipelineModule<T>> modules;
56 private volatile Date startTime;
57 private volatile boolean running;
58 private volatile PipelineModule<T> currentModule;
59
70 IngestPipeline(IngestJobExecutor ingestJobExecutor, List<IngestModuleTemplate> moduleTemplates) {
71 this.ingestJobExecutor = ingestJobExecutor;
72 this.moduleTemplates = moduleTemplates;
73 modules = new ArrayList<>();
74 }
75
82 boolean isEmpty() {
83 return modules.isEmpty();
84 }
85
93 boolean isRunning() {
94 return running;
95 }
96
103 List<IngestModuleError> startUp() {
104 List<IngestModuleError> errors = new ArrayList<>();
105 if (!running) {
106 /*
107 * The creation of ingest modules from the ingest module templates
108 * has been deliberately deferred to the startUp() method so that
109 * any and all errors in module construction or start up can be
110 * reported to the client code.
111 */
112 createIngestModules();
113 errors.addAll(startUpIngestModules());
114 } else {
115 errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline already started"))); //NON-NLS
116 }
117 return errors;
118 }
119
124 private void createIngestModules() {
125 if (modules.isEmpty()) {
126 for (IngestModuleTemplate template : moduleTemplates) {
127 Optional<PipelineModule<T>> module = acceptModuleTemplate(template);
128 if (module.isPresent()) {
129 modules.add(module.get());
130 }
131 }
132 }
133 }
134
146 abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate template);
147
153 private List<IngestModuleError> startUpIngestModules() {
154 List<IngestModuleError> errors = new ArrayList<>();
155 startTime = new Date();
156 running = true;
157 for (PipelineModule<T> module : modules) {
158 try {
159 module.startUp(new IngestJobContext(ingestJobExecutor));
160 } catch (Throwable ex) {
161 /*
162 * A catch-all exception firewall. Start up errors for all of
163 * the ingest modules, whether checked exceptions or runtime
164 * exceptions, are reported to allow correction of all of the
165 * error conditions in one go.
166 */
167 errors.add(new IngestModuleError(module.getDisplayName(), ex));
168 }
169 }
170 return errors;
171 }
172
179 Date getStartTime() {
180 Date reportedStartTime = null;
181 if (startTime != null) {
182 reportedStartTime = new Date(startTime.getTime());
183 }
184 return reportedStartTime;
185 }
186
195 List<IngestModuleError> performTask(T task) {
196 List<IngestModuleError> errors = new ArrayList<>();
197 if (running) {
198 if (!ingestJobExecutor.isCancelled()) {
199 pauseIfScheduled();
200 if (ingestJobExecutor.isCancelled()) {
201 return errors;
202 }
203 try {
204 prepareForTask(task);
205 } catch (IngestPipelineException ex) {
206 errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
207 return errors;
208 }
209 for (PipelineModule<T> module : modules) {
210 pauseIfScheduled();
211 if (ingestJobExecutor.isCancelled()) {
212 break;
213 }
214 try {
215 currentModule = module;
216 currentModule.setProcessingStartTime();
217 module.process(ingestJobExecutor, task);
218 } catch (Throwable ex) { // Catch-all exception firewall
219 /*
220 * Note that an exception from a module does not stop
221 * processing of the task by the other modules in the
222 * pipeline.
223 */
224 errors.add(new IngestModuleError(module.getDisplayName(), ex));
225 }
226 if (ingestJobExecutor.isCancelled()) {
227 break;
228 }
229 }
230 }
231 try {
232 cleanUpAfterTask(task);
233 } catch (IngestPipelineException ex) {
234 errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
235 }
236 } else {
237 errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline not started or shut down"))); //NON-NLS
238 }
239 currentModule = null;
240 return errors;
241 }
242
249 private void pauseIfScheduled() {
250 if (ScheduledIngestPauseSettings.getPauseEnabled() == true) {
251 /*
252 * Calculate the date/time for the scheduled pause start by
253 * "normalizing" the day of week to the current week and then
254 * adjusting the hour and minute to match the scheduled hour and
255 * minute.
256 */
257 LocalDateTime pauseStart = LocalDateTime.now();
258 DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
259 while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
260 pauseStart = pauseStart.minusDays(1);
261 }
262 pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
263 pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
264 pauseStart = pauseStart.withSecond(0);
265
266 /*
267 * Calculate the pause end date/time.
268 */
269 LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
270
271 /*
272 * Check whether the current date/time is in the pause interval. If
273 * it is, register the ingest thread this code is running in so it
274 * can be interrupted if the job is canceled, and sleep until
275 * whatever time remains in the pause interval has expired.
276 */
277 LocalDateTime timeNow = LocalDateTime.now();
278 if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
279 ingestJobExecutor.registerPausedIngestThread(Thread.currentThread());
280 try {
281 long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
282 logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
283 sleep(timeRemainingMillis);
284 logger.log(Level.INFO, String.format("%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
285 } catch (InterruptedException notLogged) {
286 logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
287 } finally {
288 ingestJobExecutor.unregisterPausedIngestThread(Thread.currentThread());
289 }
290 }
291 }
292 }
293
303 abstract void prepareForTask(T task) throws IngestPipelineException;
304
310 PipelineModule<T> getCurrentlyRunningModule() {
311 return currentModule;
312 }
313
319 List<IngestModuleError> shutDown() {
320 List<IngestModuleError> errors = new ArrayList<>();
321 if (running == true) {
322 for (PipelineModule<T> module : modules) {
323 try {
324 module.shutDown();
325 } catch (Throwable ex) { // Catch-all exception firewall
326 errors.add(new IngestModuleError(module.getDisplayName(), ex));
327 String msg = ex.getMessage();
328 if (msg == null) {
329 /*
330 * Jython run-time errors don't seem to have a message,
331 * but have details in the string returned by
332 * toString().
333 */
334 msg = ex.toString();
335 }
336 MessageNotifyUtil.Notify.error(NbBundle.getMessage(this.getClass(), "FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
337 }
338 }
339 }
340 running = false;
341 return errors;
342
343 }
344
354 abstract void cleanUpAfterTask(T task) throws IngestPipelineException;
355
360 static abstract class PipelineModule<T extends IngestTask> implements IngestModule {
361
362 private final IngestModule module;
363 private final String displayName;
364 private volatile Date processingStartTime;
365
374 PipelineModule(IngestModule module, String displayName) {
375 this.module = module;
376 this.displayName = displayName;
377 processingStartTime = new Date();
378 }
379
385 String getClassName() {
386 return module.getClass().getCanonicalName();
387 }
388
394 String getDisplayName() {
395 return displayName;
396 }
397
402 void setProcessingStartTime() {
403 processingStartTime = new Date();
404 }
405
412 Date getProcessingStartTime() {
413 return new Date(processingStartTime.getTime());
414 }
415
416 @Override
417 public void startUp(IngestJobContext context) throws IngestModuleException {
418 module.startUp(context);
419 }
420
432 abstract void process(IngestJobExecutor ingestJobExecutor, T task) throws IngestModuleException;
433
434 @Override
435 public void shutDown() {
436 module.shutDown();
437 }
438
439 }
440
444 static class IngestPipelineException extends Exception {
445
446 private static final long serialVersionUID = 1L;
447
453 IngestPipelineException(String message) {
454 super(message);
455 }
456
463 IngestPipelineException(String message, Throwable cause) {
464 super(message, cause);
465 }
466
467 }
468
469}

Copyright © 2012-2024 Sleuth Kit Labs. Generated on:
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.