Skip to content

replace ForkJoinPool with fixed thread pool #4552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -47,9 +48,11 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@@ -1902,54 +1905,58 @@ private void indexParallel(String dir, IndexDownArgs args) {
IndexerParallelizer parallelizer = RuntimeEnvironment.getInstance().getIndexerParallelizer();
ObjectPool<Ctags> ctagsPool = parallelizer.getCtagsPool();

Map<Boolean, List<IndexFileWork>> bySuccess = null;
Map<Boolean, List<IndexFileWork>> bySuccess = new HashMap<>();
try (Progress progress = new Progress(LOGGER, String.format("indexing '%s'", dir), worksCount)) {
bySuccess = parallelizer.getForkJoinPool().submit(() ->
args.works.parallelStream().collect(
Collectors.groupingByConcurrent((x) -> {
int tries = 0;
Ctags pctags = null;
boolean ret;
while (true) {
try {
if (alreadyClosedCounter.get() > 0) {
ret = false;
} else {
pctags = ctagsPool.get();
addFile(x.file, x.path, pctags);
successCounter.incrementAndGet();
ret = true;
Set<Callable<IndexFileWork>> callables = args.works.stream().
<Callable<IndexFileWork>>map(x -> () -> {
int tries = 0;
Ctags pctags = null;
while (true) {
try {
if (alreadyClosedCounter.get() > 0) {
x.ret = false;
} else {
pctags = ctagsPool.get();
addFile(x.file, x.path, pctags);
successCounter.incrementAndGet();
x.ret = true;
}
} catch (AlreadyClosedException e) {
alreadyClosedCounter.incrementAndGet();
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.SEVERE, errmsg, e);
x.exception = e;
x.ret = false;
} catch (InterruptedException e) {
// Allow one retry if interrupted
if (++tries <= 1) {
continue;
}
LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file);
x.exception = e;
x.ret = false;
} catch (RuntimeException | IOException e) {
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.WARNING, errmsg, e);
x.exception = e;
x.ret = false;
} finally {
if (pctags != null) {
pctags.reset();
ctagsPool.release(pctags);
}
}
} catch (AlreadyClosedException e) {
alreadyClosedCounter.incrementAndGet();
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.SEVERE, errmsg, e);
x.exception = e;
ret = false;
} catch (InterruptedException e) {
// Allow one retry if interrupted
if (++tries <= 1) {
continue;
}
LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file);
x.exception = e;
ret = false;
} catch (RuntimeException | IOException e) {
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
LOGGER.log(Level.WARNING, errmsg, e);
x.exception = e;
ret = false;
} finally {
if (pctags != null) {
pctags.reset();
ctagsPool.release(pctags);
}
}

progress.increment();
return ret;
}
}))).get();
progress.increment();
return x;
}
}).
collect(Collectors.toSet());
List<Future<IndexFileWork>> futures = parallelizer.getIndexWorkExecutor().invokeAll(callables);
for (var future : futures) {
IndexFileWork work = future.get();
bySuccess.computeIfAbsent(work.ret, key -> new ArrayList<>()).add(work);
}
} catch (InterruptedException | ExecutionException e) {
interrupted = true;
int successCount = successCounter.intValue();
@@ -1961,14 +1968,9 @@ private void indexParallel(String dir, IndexDownArgs args) {

args.curCount = currentCounter.intValue();

// Start with failureCount=worksCount, and then subtract successes.
int failureCount = worksCount;
if (bySuccess != null) {
List<IndexFileWork> successes = bySuccess.getOrDefault(Boolean.TRUE, null);
if (successes != null) {
failureCount -= successes.size();
}
}
int failureCount = worksCount - Optional.ofNullable(bySuccess.get(Boolean.TRUE))
.map(List::size)
.orElse(0);
if (failureCount > 0) {
double pctFailed = 100.0 * failureCount / worksCount;
String exmsg = String.format("%d failures (%.1f%%) while parallel-indexing", failureCount, pctFailed);
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ class IndexFileWork {
final File file;
final String path;
Exception exception;
boolean ret;

IndexFileWork(File file, String path) {
this.file = file;
Original file line number Diff line number Diff line change
@@ -25,8 +25,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.opengrok.indexer.analysis.Ctags;
@@ -38,27 +36,25 @@
import org.opengrok.indexer.util.ObjectFactory;
import org.opengrok.indexer.util.ObjectPool;

import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;

/**
* Represents a container for executors that enable parallelism for indexing
* across projects and repositories and also within any {@link IndexDatabase}
* instance -- with global limits for all execution.
* <p>A fixed-thread pool is used for parallelism across repositories, and a
* work-stealing {@link ForkJoinPool} is used for parallelism within any
* {@link #lzIndexWorkExecutor} is used for parallelism within any
* {@link IndexDatabase}. Threads in the former pool are customers of the
* latter, and the bulk of work is done in the latter pool. The work-stealing
* {@link ForkJoinPool} makes use of a corresponding fixed pool of {@link Ctags}
* instances.
* <p>Additionally there are pools for executing for history, for renamings in
* latter, and the bulk of work is done in the latter pool.
* The {@link #lzIndexWorkExecutor} makes use of a corresponding fixed pool
* of {@link Ctags} instances.
* <p>Additionally there are pools for executing for history, for renames in
* history, and for watching the {@link Ctags} instances for timing purposes.
*/
public class IndexerParallelizer implements AutoCloseable {

private final RuntimeEnvironment env;
private final int indexingParallelism;

private LazilyInstantiate<ForkJoinPool> lzForkJoinPool;
private LazilyInstantiate<ExecutorService> lzIndexWorkExecutor;
private LazilyInstantiate<ObjectPool<Ctags>> lzCtagsPool;
private LazilyInstantiate<ExecutorService> lzFixedExecutor;
private LazilyInstantiate<ExecutorService> lzHistoryExecutor;
@@ -82,7 +78,7 @@ public IndexerParallelizer(RuntimeEnvironment env) {
*/
this.indexingParallelism = env.getIndexingParallelism();

createLazyForkJoinPool();
createIndexWorkExecutor();
createLazyCtagsPool();
createLazyFixedExecutor();
createLazyHistoryExecutor();
@@ -99,10 +95,10 @@ public ExecutorService getFixedExecutor() {
}

/**
* @return the forkJoinPool
* @return the executor used for individual file processing in the 2nd stage of indexing
*/
public ForkJoinPool getForkJoinPool() {
return lzForkJoinPool.get();
public ExecutorService getIndexWorkExecutor() {
return lzIndexWorkExecutor.get();
}

/**
@@ -166,7 +162,7 @@ public void close() {
* call this method satisfactorily too.
*/
public void bounce() {
bounceForkJoinPool();
bounceIndexWorkExecutor();
bounceFixedExecutor();
bounceCtagsPool();
bounceHistoryExecutor();
@@ -175,11 +171,11 @@ public void bounce() {
bounceXrefWatcherExecutor();
}

private void bounceForkJoinPool() {
if (lzForkJoinPool.isActive()) {
ForkJoinPool formerForkJoinPool = lzForkJoinPool.get();
createLazyForkJoinPool();
formerForkJoinPool.shutdown();
private void bounceIndexWorkExecutor() {
if (lzIndexWorkExecutor.isActive()) {
ExecutorService formerIndexWorkExecutor = lzIndexWorkExecutor.get();
createIndexWorkExecutor();
formerIndexWorkExecutor.shutdown();
}
}

@@ -231,13 +227,10 @@ private void bounceXrefWatcherExecutor() {
}
}

private void createLazyForkJoinPool() {
lzForkJoinPool = LazilyInstantiate.using(() ->
new ForkJoinPool(indexingParallelism, forkJoinPool -> {
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool);
thread.setName(OpenGrokThreadFactory.PREFIX + "ForkJoinPool-" + thread.getId());
return thread;
}, null, false));
private void createIndexWorkExecutor() {
lzIndexWorkExecutor = LazilyInstantiate.using(() ->
Executors.newFixedThreadPool(indexingParallelism,
new OpenGrokThreadFactory("index-worker")));
}

private void createLazyCtagsPool() {
@@ -261,7 +254,7 @@ private void createLazyXrefWatcherExecutor() {
private void createLazyFixedExecutor() {
lzFixedExecutor = LazilyInstantiate.using(() ->
Executors.newFixedThreadPool(indexingParallelism,
new OpenGrokThreadFactory("index-worker")));
new OpenGrokThreadFactory("index-db")));
}

private void createLazyHistoryExecutor() {