19 package org.sleuthkit.autopsy.keywordsearch;
 
   21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
   22 import java.util.ArrayList;
 
   23 import java.util.Collections;
 
   24 import java.util.HashMap;
 
   25 import java.util.HashSet;
 
   26 import java.util.Iterator;
 
   27 import java.util.List;
 
   29 import java.util.Map.Entry;
 
   31 import java.util.concurrent.CancellationException;
 
   32 import java.util.concurrent.ConcurrentHashMap;
 
   33 import java.util.concurrent.ExecutionException;
 
   34 import java.util.concurrent.Future;
 
   35 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
   36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
   37 import java.util.concurrent.atomic.AtomicLong;
 
   38 import java.util.logging.Level;
 
   39 import javax.swing.SwingUtilities;
 
   40 import javax.swing.SwingWorker;
 
   41 import org.netbeans.api.progress.aggregate.AggregateProgressFactory;
 
   42 import org.netbeans.api.progress.aggregate.AggregateProgressHandle;
 
   43 import org.netbeans.api.progress.aggregate.ProgressContributor;
 
   44 import org.openide.util.Cancellable;
 
   45 import org.openide.util.NbBundle;
 
   46 import org.openide.util.NbBundle.Messages;
 
   58 final class IngestSearchRunner {
 
   60     private static final Logger logger = Logger.
getLogger(IngestSearchRunner.class.getName());
 
   61     private static IngestSearchRunner instance = null;
 
   62     private IngestServices services = IngestServices.getInstance();
 
   63     private Ingester ingester = null;
 
   64     private long currentUpdateIntervalMs;
 
   65     private volatile boolean periodicSearchTaskRunning = 
false;
 
   66     private Future<?> jobProcessingTaskFuture;
 
   67     private final ScheduledThreadPoolExecutor jobProcessingExecutor;
 
   68     private static final int NUM_SEARCH_SCHEDULING_THREADS = 1;
 
   69     private static final String SEARCH_SCHEDULER_THREAD_NAME = 
"periodic-search-scheduler-%d";
 
   72     private Map<Long, SearchJobInfo> jobs = 
new ConcurrentHashMap<>();
 
   74     IngestSearchRunner() {
 
   75         currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
 
   76         ingester = Ingester.getDefault();
 
   77         jobProcessingExecutor = 
new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS, 
new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build());
 
   84     public static synchronized IngestSearchRunner getInstance() {
 
   85         if (instance == null) {
 
   86             instance = 
new IngestSearchRunner();
 
   96     public synchronized void startJob(IngestJobContext jobContext, List<String> keywordListNames) {
 
   97         long jobId = jobContext.getJobId();
 
   98         if (jobs.containsKey(jobId) == 
false) {
 
   99             logger.log(Level.INFO, 
"Adding job {0}", jobId); 
 
  100             SearchJobInfo jobData = 
new SearchJobInfo(jobContext, keywordListNames);
 
  101             jobs.put(jobId, jobData);
 
  105         jobs.get(jobId).incrementModuleReferenceCount();
 
  108         if ((jobs.size() > 0) && (periodicSearchTaskRunning == 
false)) {
 
  110             logger.log(Level.INFO, 
"Resetting periodic search time out to default value"); 
 
  111             currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
 
  112             jobProcessingTaskFuture = jobProcessingExecutor.schedule(
new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
 
  113             periodicSearchTaskRunning = 
true;
 
  123     public synchronized void endJob(
long jobId) {
 
  125         boolean readyForFinalSearch = 
false;
 
  126         job = jobs.get(jobId);
 
  132         if (job.decrementModuleReferenceCount() == 0) {
 
  134             readyForFinalSearch = 
true;
 
  137         if (readyForFinalSearch) {
 
  138             logger.log(Level.INFO, 
"Commiting search index before final search for search job {0}", job.getJobId()); 
 
  143             if (jobs.isEmpty()) {
 
  146                 logger.log(Level.INFO, 
"No more search jobs. Stopping periodic search task"); 
 
  147                 periodicSearchTaskRunning = 
false;
 
  148                 jobProcessingTaskFuture.cancel(
true);
 
  159     public synchronized void stopJob(
long jobId) {
 
  160         logger.log(Level.INFO, 
"Stopping search job {0}", jobId); 
 
  164         job = jobs.get(jobId);
 
  170         IngestSearchRunner.Searcher currentSearcher = job.getCurrentSearcher();
 
  171         if ((currentSearcher != null) && (!currentSearcher.isDone())) {
 
  172             logger.log(Level.INFO, 
"Cancelling search job {0}", jobId); 
 
  173             currentSearcher.cancel(
true);
 
  178         if (jobs.isEmpty()) {
 
  181             logger.log(Level.INFO, 
"No more search jobs. Stopping periodic search task"); 
 
  182             periodicSearchTaskRunning = 
false;
 
  183             jobProcessingTaskFuture.cancel(
true);
 
  193     public synchronized void addKeywordListsToAllJobs(List<String> keywordListNames) {
 
  194         for (String listName : keywordListNames) {
 
  195             logger.log(Level.INFO, 
"Adding keyword list {0} to all jobs", listName); 
 
  196             for (SearchJobInfo j : jobs.values()) {
 
  197                 j.addKeywordListName(listName);
 
  205     private void commit() {
 
  210             final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles();
 
  211             KeywordSearch.fireNumIndexedFilesChange(null, numIndexedFiles);
 
  212         } 
catch (NoOpenCoreException | KeywordSearchModuleException ex) {
 
  213             logger.log(Level.SEVERE, 
"Error executing Solr query to check number of indexed files", ex); 
 
  223     private void doFinalSearch(SearchJobInfo job) {
 
  225         logger.log(Level.INFO, 
"Starting final search for search job {0}", job.getJobId());         
 
  226         if (!job.getKeywordListNames().isEmpty()) {
 
  229                 logger.log(Level.INFO, 
"Checking for previous search for search job {0} before executing final search", job.getJobId()); 
 
  230                 job.waitForCurrentWorker();
 
  232                 IngestSearchRunner.Searcher finalSearcher = 
new IngestSearchRunner.Searcher(job, 
true);
 
  233                 job.setCurrentSearcher(finalSearcher); 
 
  234                 logger.log(Level.INFO, 
"Kicking off final search for search job {0}", job.getJobId()); 
 
  235                 finalSearcher.execute(); 
 
  238                 logger.log(Level.INFO, 
"Waiting for final search for search job {0}", job.getJobId()); 
 
  240                 logger.log(Level.INFO, 
"Final search for search job {0} completed", job.getJobId()); 
 
  242             } 
catch (InterruptedException | CancellationException ex) {
 
  243                 logger.log(Level.INFO, 
"Final search for search job {0} interrupted or cancelled", job.getJobId()); 
 
  244             } 
catch (ExecutionException ex) {
 
  245                 logger.log(Level.SEVERE, String.format(
"Final search for search job %d failed", job.getJobId()), ex); 
 
  260             if (jobs.isEmpty() || jobProcessingTaskFuture.isCancelled()) {
 
  261                 logger.log(Level.INFO, 
"Exiting periodic search task"); 
 
  262                 periodicSearchTaskRunning = 
false;
 
  268             logger.log(Level.INFO, 
"Starting periodic searches");
 
  272             for (Iterator<Entry<Long, SearchJobInfo>> iterator = jobs.entrySet().iterator(); iterator.hasNext();) {
 
  275                 if (jobProcessingTaskFuture.isCancelled()) {
 
  276                     logger.log(Level.INFO, 
"Search has been cancelled. Exiting periodic search task."); 
 
  277                     periodicSearchTaskRunning = 
false;
 
  284                     logger.log(Level.INFO, 
"Executing periodic search for search job {0}", job.
getJobId());
 
  293                     } 
catch (InterruptedException | ExecutionException ex) {
 
  294                         logger.log(Level.SEVERE, 
"Error performing keyword search: {0}", ex.getMessage()); 
 
  296                                 NbBundle.getMessage(this.getClass(),
 
  297                                         "SearchRunner.Searcher.done.err.msg"), ex.getMessage()));
 
  299                       catch (java.util.concurrent.CancellationException ex) {
 
  304             logger.log(Level.INFO, 
"All periodic searches cumulatively took {0} secs", stopWatch.
getElapsedTimeSecs()); 
 
  310             jobProcessingTaskFuture = jobProcessingExecutor.schedule(
new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
 
  319             if (lastSerchTimeSec * 1000 < currentUpdateIntervalMs / 4) {
 
  323             currentUpdateIntervalMs = currentUpdateIntervalMs * 2;
 
  324             logger.log(Level.WARNING, 
"Last periodic search took {0} sec. Increasing search interval to {1} sec", 
new Object[]{lastSerchTimeSec, currentUpdateIntervalMs/1000});
 
  353             currentResults = 
new HashMap<>();
 
  354             workerRunning = 
false;
 
  375             if (!keywordListNames.contains(keywordListName)) {
 
  376                 keywordListNames.add(keywordListName);
 
  381             return currentResults.get(k);
 
  385             currentResults.put(k, resultsIDs);
 
  393             workerRunning = flag;
 
  405             moduleReferenceCount.incrementAndGet();
 
  409             return moduleReferenceCount.decrementAndGet();
 
  419                 while (workerRunning) {
 
  420                     logger.log(Level.INFO, 
"Waiting for previous worker to finish"); 
 
  421                     finalSearchLock.wait(); 
 
  422                     logger.log(Level.INFO, 
"Notified previous worker finished"); 
 
  432                 logger.log(Level.INFO, 
"Notifying after finishing search"); 
 
  433                 workerRunning = 
false;
 
  434                 finalSearchLock.notify();
 
  445     private final class Searcher extends SwingWorker<Object, Void> {
 
  462             keywords = 
new ArrayList<>();
 
  463             keywordToList = 
new HashMap<>();
 
  464             keywordLists = 
new ArrayList<>();
 
  474         @Messages(
"SearchRunner.query.exception.msg=Error performing query:")
 
  476             final String displayName = NbBundle.getMessage(this.getClass(), 
"KeywordSearchIngestModule.doInBackGround.displayName")
 
  477                     + (finalRun ? (
" - " + NbBundle.getMessage(this.getClass(), 
"KeywordSearchIngestModule.doInBackGround.finalizeMsg")) : 
"");
 
  478             final String pgDisplayName = displayName + (
" (" + NbBundle.getMessage(this.getClass(), 
"KeywordSearchIngestModule.doInBackGround.pendingMsg") + 
")");
 
  479             progressGroup = AggregateProgressFactory.createSystemHandle(pgDisplayName, null, 
new Cancellable() {
 
  481                 public boolean cancel() {
 
  482                     logger.log(Level.INFO, 
"Cancelling the searcher by user."); 
 
  483                     if (progressGroup != null) {
 
  484                         progressGroup.setDisplayName(displayName + 
" " + NbBundle.getMessage(
this.getClass(), 
"SearchRunner.doInBackGround.cancelMsg"));
 
  486                     return IngestSearchRunner.Searcher.this.cancel(
true);
 
  492             ProgressContributor[] subProgresses = 
new ProgressContributor[keywords.size()];
 
  494             for (Keyword keywordQuery : keywords) {
 
  495                 subProgresses[i] = AggregateProgressFactory.createProgressContributor(keywordQuery.getSearchTerm());
 
  496                 progressGroup.addContributor(subProgresses[i]);
 
  500             progressGroup.start();
 
  505                 progressGroup.setDisplayName(displayName);
 
  507                 int keywordsSearched = 0;
 
  509                 for (Keyword keyword : keywords) {
 
  511                         logger.log(Level.INFO, 
"Cancel detected, bailing before new keyword processed: {0}", keyword.getSearchTerm()); 
 
  515                     final KeywordList keywordList = keywordToList.get(keyword);
 
  519                     if (keywordsSearched > 0) {
 
  520                         subProgresses[keywordsSearched - 1].finish();
 
  523                     KeywordSearchQuery keywordSearchQuery = KeywordSearchUtil.getQueryForKeyword(keyword, keywordList);
 
  528                     final KeywordQueryFilter dataSourceFilter = 
new KeywordQueryFilter(KeywordQueryFilter.FilterType.DATA_SOURCE, job.
getDataSourceId());
 
  529                     keywordSearchQuery.addFilter(dataSourceFilter);
 
  531                     QueryResults queryResults;
 
  535                         queryResults = keywordSearchQuery.performQuery();
 
  537                         logger.log(Level.SEVERE, 
"Error performing query: " + keyword.getSearchTerm(), ex); 
 
  543                     } 
catch (CancellationException e) {
 
  544                         logger.log(Level.INFO, 
"Cancel detected, bailing during keyword query: {0}", keyword.getSearchTerm()); 
 
  552                     if (!newResults.getKeywords().isEmpty()) {
 
  556                         int totalUnits = newResults.getKeywords().size();
 
  557                         subProgresses[keywordsSearched].start(totalUnits);
 
  558                         int unitProgress = 0;
 
  559                         String queryDisplayStr = keyword.getSearchTerm();
 
  560                         if (queryDisplayStr.length() > 50) {
 
  561                             queryDisplayStr = queryDisplayStr.substring(0, 49) + 
"...";
 
  563                         subProgresses[keywordsSearched].progress(keywordList.getName() + 
": " + queryDisplayStr, unitProgress);
 
  566                         newResults.process(null, subProgresses[keywordsSearched], 
this, keywordList.getIngestMessages(), 
true);
 
  571                     subProgresses[keywordsSearched].progress(
"");
 
  578             catch (Exception ex) {
 
  579                 logger.log(Level.WARNING, 
"searcher exception occurred", ex); 
 
  584                     logger.log(Level.INFO, 
"Searcher took {0} secs to run (final = {1})", 
new Object[]{stopWatch.getElapsedTimeSecs(), this.finalRun}); 
 
  598             XmlKeywordSearchList loader = XmlKeywordSearchList.getCurrent();
 
  601             keywordToList.clear();
 
  602             keywordLists.clear();
 
  604             for (String name : keywordListNames) {
 
  606                 keywordLists.add(list);
 
  607                 for (Keyword k : list.getKeywords()) {
 
  609                     keywordToList.put(k, list);
 
  620             SwingUtilities.invokeLater(
new Runnable() {
 
  623                     progressGroup.finish();
 
  647             QueryResults newResults = 
new QueryResults(queryResult.getQuery());
 
  650             for (Keyword keyword : queryResult.getKeywords()) {
 
  653                 List<KeywordHit> queryTermResults = queryResult.getResults(keyword);
 
  657                 Collections.sort(queryTermResults);
 
  661                 List<KeywordHit> newUniqueHits = 
new ArrayList<>();
 
  666                 if (curTermResults == null) {
 
  669                     curTermResults = 
new HashSet<>();
 
  673                 for (KeywordHit hit : queryTermResults) {
 
  674                     if (curTermResults.contains(hit.getSolrObjectId())) {
 
  682                     newUniqueHits.add(hit);
 
  686                     curTermResults.add(hit.getSolrObjectId());
 
  695                 newResults.addResult(keyword, newUniqueHits);
 
SearchJobInfo(IngestJobContext jobContext, List< String > keywordListNames)
long getElapsedTimeSecs()
List< String > keywordListNames
synchronized IngestSearchRunner.Searcher getCurrentSearcher()
Map< Keyword, KeywordList > keywordToList
static IngestMessage createErrorMessage(String source, String subject, String detailsHtml)
synchronized void addKeywordListName(String keywordListName)
AggregateProgressHandle progressGroup
void incrementModuleReferenceCount()
List< KeywordList > keywordLists
synchronized void setCurrentSearcher(IngestSearchRunner.Searcher searchRunner)
synchronized List< String > getKeywordListNames()
Logger getLogger(String moduleDisplayName)
long decrementModuleReferenceCount()
boolean isWorkerRunning()
IngestJobContext getJobContext()
void setWorkerRunning(boolean flag)
void waitForCurrentWorker()
List< String > keywordListNames
synchronized void addKeywordResults(Keyword k, Set< Long > resultsIDs)
AtomicLong moduleReferenceCount
final IngestJobContext jobContext
boolean fileIngestIsCancelled()
QueryResults filterResults(QueryResults queryResult)
final Object finalSearchLock
Map< Keyword, Set< Long > > currentResults
synchronized Set< Long > currentKeywordResults(Keyword k)
static void error(String title, String message)
void recalculateUpdateIntervalTime(long lastSerchTimeSec)
synchronized static Logger getLogger(String name)
IngestSearchRunner.Searcher currentSearcher
volatile boolean workerRunning