Autopsy 4.22.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
BatchProcessor.java
Go to the documentation of this file.
1/*
2 * Autopsy Forensic Browser
3 *
4 * Copyright 2023 Basis Technology Corp.
5 * Contact: carrier <at> sleuthkit <dot> org
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19package com.basistech.df.cybertriage.autopsy.malwarescan;
20
21import java.util.ArrayList;
22import java.util.List;
23import java.util.concurrent.BlockingQueue;
24import java.util.concurrent.ExecutorService;
25import java.util.concurrent.Executors;
26import java.util.concurrent.LinkedBlockingQueue;
27import java.util.concurrent.TimeUnit;
28import java.util.function.Consumer;
29
35class BatchProcessor<T> {
36
37 private ExecutorService processingExecutorService = Executors.newSingleThreadExecutor();
38
39 private final BlockingQueue<T> batchingQueue;
40 private final int batchSize;
41 private final Consumer<List<T>> itemsConsumer;
42 private final long secondsTimeout;
43
44 public BatchProcessor(int batchSize, long secondsTimeout, Consumer<List<T>> itemsConsumer) {
45 this.batchingQueue = new LinkedBlockingQueue<>(batchSize);
46 this.batchSize = batchSize;
47 this.itemsConsumer = itemsConsumer;
48 this.secondsTimeout = secondsTimeout;
49 }
50
51 public synchronized void add(T item) throws InterruptedException {
52 batchingQueue.add(item);
53 if (batchingQueue.size() >= batchSize) {
54 asyncProcessBatch();
55 }
56 }
57
58 public synchronized void flushAndReset() throws InterruptedException {
59 // get any remaining
60 asyncProcessBatch();
61
62 // don't accept any new additions
63 processingExecutorService.shutdown();
64
65 // await termination
66 processingExecutorService.awaitTermination(secondsTimeout, TimeUnit.SECONDS);
67
68 // get new (not shut down executor)
69 processingExecutorService = Executors.newSingleThreadExecutor();
70 }
71
72 private synchronized void asyncProcessBatch() throws InterruptedException {
73 if (!batchingQueue.isEmpty()) {
74 final List<T> processingList = new ArrayList<>();
75
76 // transfer batching queue to processing queue
77 batchingQueue.drainTo(processingList);
78
79 // submit to be processed
80 processingExecutorService.submit(() -> itemsConsumer.accept(processingList));
81 }
82 }
83
84}

Copyright © 2012-2024 Sleuth Kit Labs. Generated on:
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.