diff --git a/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDatabase.java b/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDatabase.java index f52f2656a98..669b35f21ed 100644 --- a/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDatabase.java +++ b/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDatabase.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -47,9 +48,11 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -1902,54 +1905,58 @@ private void indexParallel(String dir, IndexDownArgs args) { IndexerParallelizer parallelizer = RuntimeEnvironment.getInstance().getIndexerParallelizer(); ObjectPool ctagsPool = parallelizer.getCtagsPool(); - Map> bySuccess = null; + Map> bySuccess = new HashMap<>(); try (Progress progress = new Progress(LOGGER, String.format("indexing '%s'", dir), worksCount)) { - bySuccess = parallelizer.getForkJoinPool().submit(() -> - args.works.parallelStream().collect( - Collectors.groupingByConcurrent((x) -> { - int tries = 0; - Ctags pctags = null; - boolean ret; - while (true) { - try { - if (alreadyClosedCounter.get() > 0) { - ret = false; - } else { - pctags = ctagsPool.get(); - addFile(x.file, x.path, pctags); - successCounter.incrementAndGet(); - ret = true; + Set> callables = args.works.stream(). + >map(x -> () -> { + int tries = 0; + Ctags pctags = null; + while (true) { + try { + if (alreadyClosedCounter.get() > 0) { + x.ret = false; + } else { + pctags = ctagsPool.get(); + addFile(x.file, x.path, pctags); + successCounter.incrementAndGet(); + x.ret = true; + } + } catch (AlreadyClosedException e) { + alreadyClosedCounter.incrementAndGet(); + String errmsg = String.format("ERROR addFile(): '%s'", x.file); + LOGGER.log(Level.SEVERE, errmsg, e); + x.exception = e; + x.ret = false; + } catch (InterruptedException e) { + // Allow one retry if interrupted + if (++tries <= 1) { + continue; + } + LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file); + x.exception = e; + x.ret = false; + } catch (RuntimeException | IOException e) { + String errmsg = String.format("ERROR addFile(): '%s'", x.file); + LOGGER.log(Level.WARNING, errmsg, e); + x.exception = e; + x.ret = false; + } finally { + if (pctags != null) { + pctags.reset(); + ctagsPool.release(pctags); + } } - } catch (AlreadyClosedException e) { - alreadyClosedCounter.incrementAndGet(); - String errmsg = String.format("ERROR addFile(): '%s'", x.file); - LOGGER.log(Level.SEVERE, errmsg, e); - x.exception = e; - ret = false; - } catch (InterruptedException e) { - // Allow one retry if interrupted - if (++tries <= 1) { - continue; - } - LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file); - x.exception = e; - ret = false; - } catch (RuntimeException | IOException e) { - String errmsg = String.format("ERROR addFile(): '%s'", x.file); - LOGGER.log(Level.WARNING, errmsg, e); - x.exception = e; - ret = false; - } finally { - if (pctags != null) { - pctags.reset(); - ctagsPool.release(pctags); - } - } - progress.increment(); - return ret; - } - }))).get(); + progress.increment(); + return x; + } + }). + collect(Collectors.toSet()); + List> futures = parallelizer.getIndexWorkExecutor().invokeAll(callables); + for (var future : futures) { + IndexFileWork work = future.get(); + bySuccess.computeIfAbsent(work.ret, key -> new ArrayList<>()).add(work); + } } catch (InterruptedException | ExecutionException e) { interrupted = true; int successCount = successCounter.intValue(); @@ -1961,14 +1968,9 @@ private void indexParallel(String dir, IndexDownArgs args) { args.curCount = currentCounter.intValue(); - // Start with failureCount=worksCount, and then subtract successes. - int failureCount = worksCount; - if (bySuccess != null) { - List successes = bySuccess.getOrDefault(Boolean.TRUE, null); - if (successes != null) { - failureCount -= successes.size(); - } - } + int failureCount = worksCount - Optional.ofNullable(bySuccess.get(Boolean.TRUE)) + .map(List::size) + .orElse(0); if (failureCount > 0) { double pctFailed = 100.0 * failureCount / worksCount; String exmsg = String.format("%d failures (%.1f%%) while parallel-indexing", failureCount, pctFailed); diff --git a/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDownArgs.java b/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDownArgs.java index 7a8178440b8..1ea907fba75 100644 --- a/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDownArgs.java +++ b/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDownArgs.java @@ -35,6 +35,7 @@ class IndexFileWork { final File file; final String path; Exception exception; + boolean ret; IndexFileWork(File file, String path) { this.file = file; diff --git a/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexerParallelizer.java b/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexerParallelizer.java index 4576dbe0eef..637cb316820 100644 --- a/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexerParallelizer.java +++ b/opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexerParallelizer.java @@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.ScheduledThreadPoolExecutor; import org.opengrok.indexer.analysis.Ctags; @@ -38,19 +36,17 @@ import org.opengrok.indexer.util.ObjectFactory; import org.opengrok.indexer.util.ObjectPool; -import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; - /** * Represents a container for executors that enable parallelism for indexing * across projects and repositories and also within any {@link IndexDatabase} * instance -- with global limits for all execution. *

A fixed-thread pool is used for parallelism across repositories, and a - * work-stealing {@link ForkJoinPool} is used for parallelism within any + * {@link #lzIndexWorkExecutor} is used for parallelism within any * {@link IndexDatabase}. Threads in the former pool are customers of the - * latter, and the bulk of work is done in the latter pool. The work-stealing - * {@link ForkJoinPool} makes use of a corresponding fixed pool of {@link Ctags} - * instances. - *

Additionally there are pools for executing for history, for renamings in + * latter, and the bulk of work is done in the latter pool. + * The {@link #lzIndexWorkExecutor} makes use of a corresponding fixed pool + * of {@link Ctags} instances. + *

Additionally there are pools for executing for history, for renames in * history, and for watching the {@link Ctags} instances for timing purposes. */ public class IndexerParallelizer implements AutoCloseable { @@ -58,7 +54,7 @@ public class IndexerParallelizer implements AutoCloseable { private final RuntimeEnvironment env; private final int indexingParallelism; - private LazilyInstantiate lzForkJoinPool; + private LazilyInstantiate lzIndexWorkExecutor; private LazilyInstantiate> lzCtagsPool; private LazilyInstantiate lzFixedExecutor; private LazilyInstantiate lzHistoryExecutor; @@ -82,7 +78,7 @@ public IndexerParallelizer(RuntimeEnvironment env) { */ this.indexingParallelism = env.getIndexingParallelism(); - createLazyForkJoinPool(); + createIndexWorkExecutor(); createLazyCtagsPool(); createLazyFixedExecutor(); createLazyHistoryExecutor(); @@ -99,10 +95,10 @@ public ExecutorService getFixedExecutor() { } /** - * @return the forkJoinPool + * @return the executor used for individual file processing in the 2nd stage of indexing */ - public ForkJoinPool getForkJoinPool() { - return lzForkJoinPool.get(); + public ExecutorService getIndexWorkExecutor() { + return lzIndexWorkExecutor.get(); } /** @@ -166,7 +162,7 @@ public void close() { * call this method satisfactorily too. */ public void bounce() { - bounceForkJoinPool(); + bounceIndexWorkExecutor(); bounceFixedExecutor(); bounceCtagsPool(); bounceHistoryExecutor(); @@ -175,11 +171,11 @@ public void bounce() { bounceXrefWatcherExecutor(); } - private void bounceForkJoinPool() { - if (lzForkJoinPool.isActive()) { - ForkJoinPool formerForkJoinPool = lzForkJoinPool.get(); - createLazyForkJoinPool(); - formerForkJoinPool.shutdown(); + private void bounceIndexWorkExecutor() { + if (lzIndexWorkExecutor.isActive()) { + ExecutorService formerIndexWorkExecutor = lzIndexWorkExecutor.get(); + createIndexWorkExecutor(); + formerIndexWorkExecutor.shutdown(); } } @@ -231,13 +227,10 @@ private void bounceXrefWatcherExecutor() { } } - private void createLazyForkJoinPool() { - lzForkJoinPool = LazilyInstantiate.using(() -> - new ForkJoinPool(indexingParallelism, forkJoinPool -> { - ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool); - thread.setName(OpenGrokThreadFactory.PREFIX + "ForkJoinPool-" + thread.getId()); - return thread; - }, null, false)); + private void createIndexWorkExecutor() { + lzIndexWorkExecutor = LazilyInstantiate.using(() -> + Executors.newFixedThreadPool(indexingParallelism, + new OpenGrokThreadFactory("index-worker"))); } private void createLazyCtagsPool() { @@ -261,7 +254,7 @@ private void createLazyXrefWatcherExecutor() { private void createLazyFixedExecutor() { lzFixedExecutor = LazilyInstantiate.using(() -> Executors.newFixedThreadPool(indexingParallelism, - new OpenGrokThreadFactory("index-worker"))); + new OpenGrokThreadFactory("index-db"))); } private void createLazyHistoryExecutor() {