19 package com.basistech.df.cybertriage.autopsy.malwarescan;
 
   21 import java.util.ArrayList;
 
   22 import java.util.List;
 
   23 import java.util.concurrent.BlockingQueue;
 
   24 import java.util.concurrent.ExecutorService;
 
   25 import java.util.concurrent.Executors;
 
   26 import java.util.concurrent.LinkedBlockingQueue;
 
   27 import java.util.concurrent.TimeUnit;
 
   28 import java.util.function.Consumer;
 
   35 class BatchProcessor<T> {
 
   37     private ExecutorService processingExecutorService = Executors.newSingleThreadExecutor();
 
   39     private final BlockingQueue<T> batchingQueue;
 
   40     private final int batchSize;
 
   41     private final Consumer<List<T>> itemsConsumer;
 
   42     private final long secondsTimeout;
 
   44     public BatchProcessor(
int batchSize, 
long secondsTimeout, Consumer<List<T>> itemsConsumer) {
 
   45         this.batchingQueue = 
new LinkedBlockingQueue<>(batchSize);
 
   46         this.batchSize = batchSize;
 
   47         this.itemsConsumer = itemsConsumer;
 
   48         this.secondsTimeout = secondsTimeout;
 
   51     public synchronized void add(T item) 
throws InterruptedException {
 
   52         batchingQueue.add(item);
 
   53         if (batchingQueue.size() >= batchSize) {
 
   58     public synchronized void flushAndReset() 
throws InterruptedException {
 
   63         processingExecutorService.shutdown();
 
   66         processingExecutorService.awaitTermination(secondsTimeout, TimeUnit.SECONDS);
 
   69         processingExecutorService = Executors.newSingleThreadExecutor();
 
   72     private synchronized void asyncProcessBatch() 
throws InterruptedException {
 
   73         if (!batchingQueue.isEmpty()) {
 
   74             final List<T> processingList = 
new ArrayList<>();
 
   77             batchingQueue.drainTo(processingList);
 
   80             processingExecutorService.submit(() -> itemsConsumer.accept(processingList));