Skip to content

Expose merge events and their memory usage estimate #126667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.index.merge.OnGoingMerge;

public interface MergeEventListener {

/**
*
* @param merge
* @param estimateMergeMemoryBytes estimate of the memory needed to perform a merge
*/
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);

void onMergeCompleted(OnGoingMerge merge);

void onMergeAborted(OnGoingMerge merge);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.MergePolicy;

@FunctionalInterface
public interface MergeMemoryEstimateProvider {

/**
* Returns an estimate of the memory needed to perform a merge
*/
long estimateMergeMemoryBytes(MergePolicy.OneMerge merge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -73,6 +75,8 @@ public class ThreadPoolMergeExecutorService {
private final int concurrentMergesFloorLimitForThrottling;
private final int concurrentMergesCeilLimitForThrottling;

private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();

public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
ThreadPool threadPool,
Settings settings
Expand Down Expand Up @@ -127,13 +131,21 @@ boolean submitMergeTask(MergeTask mergeTask) {
);
}
// then enqueue the merge task proper
queuedMergeTasks.add(mergeTask);
enqueueMergeTask(mergeTask);
return true;
}
}

void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
queuedMergeTasks.add(mergeTask);
enqueueMergeTask(mergeTask);
}

private void enqueueMergeTask(MergeTask mergeTask) {
// To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
// before adding the merge task to the queue. Adding to the queue should not fail.
mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()));
boolean added = queuedMergeTasks.add(mergeTask);
assert added;
}

public boolean allDone() {
Expand Down Expand Up @@ -201,6 +213,7 @@ private void runMergeTask(MergeTask mergeTask) {
if (mergeTask.supportsIOThrottling()) {
ioThrottledMergeTasksCount.decrementAndGet();
}
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
}
}

Expand All @@ -213,6 +226,7 @@ private void abortMergeTask(MergeTask mergeTask) {
if (mergeTask.supportsIOThrottling()) {
ioThrottledMergeTasksCount.decrementAndGet();
}
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
}
}

Expand Down Expand Up @@ -278,6 +292,10 @@ public boolean usingMaxTargetIORateBytesPerSec() {
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
}

public void registerMergeEventListener(MergeEventListener consumer) {
mergeEventListeners.add(consumer);
}

// exposed for tests
Set<MergeTask> getRunningMergeTasks() {
return runningMergeTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final AtomicLong doneMergeTaskCount = new AtomicLong();
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
private volatile boolean closed = false;
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;

public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
) {
this.shardId = shardId;
this.config = indexSettings.getMergeSchedulerConfig();
Expand All @@ -81,6 +83,7 @@ public ThreadPoolMergeScheduler(
: Double.POSITIVE_INFINITY
);
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
}

@Override
Expand Down Expand Up @@ -176,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
return new MergeTask(
mergeSource,
merge,
isAutoThrottle && config.isAutoThrottle(),
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
estimateMergeMemoryBytes
);
}

Expand Down Expand Up @@ -312,14 +317,22 @@ class MergeTask implements Runnable {
private final OnGoingMerge onGoingMerge;
private final MergeRateLimiter rateLimiter;
private final boolean supportsIOThrottling;

MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) {
private final long mergeMemoryEstimateBytes;

MergeTask(
MergeSource mergeSource,
MergePolicy.OneMerge merge,
boolean supportsIOThrottling,
String name,
long mergeMemoryEstimateBytes
) {
this.name = name;
this.mergeStartTimeNS = new AtomicLong();
this.mergeSource = mergeSource;
this.onGoingMerge = new OnGoingMerge(merge);
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
this.supportsIOThrottling = supportsIOThrottling;
this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes;
}

Schedule schedule() {
Expand Down Expand Up @@ -449,6 +462,14 @@ long estimatedMergeSize() {
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
}

public long getMergeMemoryEstimateBytes() {
return mergeMemoryEstimateBytes;
}

public OnGoingMerge getOnGoingMerge() {
return onGoingMerge;
}

@Override
public String toString() {
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -80,10 +81,14 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
.build();
TestThreadPool testThreadPool = new TestThreadPool("test", settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
var countingListener = new CountingMergeEventListener();
threadPoolMergeExecutorService.registerMergeEventListener(countingListener);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Semaphore runMergeSemaphore = new Semaphore(0);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
AtomicInteger doneMergesCount = new AtomicInteger(0);
AtomicInteger reEnqueuedBackloggedMergesCount = new AtomicInteger();
AtomicInteger abortedMergesCount = new AtomicInteger();
// submit more merge tasks than there are threads so that some are enqueued
for (int i = 0; i < mergesToSubmit; i++) {
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -95,6 +100,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
if (schedule == BACKLOG) {
// reenqueue backlogged merge task
new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start();
reEnqueuedBackloggedMergesCount.incrementAndGet();
}
return schedule;
}).when(mergeTask).schedule();
Expand All @@ -114,6 +120,7 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
}
runMergeSemaphore.acquireUninterruptibly();
doneMergesCount.incrementAndGet();
abortedMergesCount.incrementAndGet();
return null;
}).when(mergeTask).abort();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
Expand All @@ -125,6 +132,12 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
// with the other merge tasks enqueued
assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount));
});
assertBusy(
() -> assertThat(
countingListener.queued.get(),
equalTo(threadPoolExecutor.getActiveCount() + threadPoolExecutor.getQueue().size() + reEnqueuedBackloggedMergesCount.get())
)
);
// shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue
testThreadPool.shutdown();
// assert all executors, except the merge one, are terminated
Expand Down Expand Up @@ -165,6 +178,8 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
assertTrue(threadPoolExecutor.isTerminated());
assertTrue(threadPoolMergeExecutorService.allDone());
});
assertThat(countingListener.aborted.get() + countingListener.completed.get(), equalTo(doneMergesCount.get()));
assertThat(countingListener.aborted.get(), equalTo(abortedMergesCount.get()));
}

public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
Expand Down Expand Up @@ -660,6 +675,27 @@ public void testMergeTasksExecuteInSizeOrder() {
}
}

private static class CountingMergeEventListener implements MergeEventListener {
AtomicInteger queued = new AtomicInteger();
AtomicInteger aborted = new AtomicInteger();
AtomicInteger completed = new AtomicInteger();

@Override
public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) {
queued.incrementAndGet();
}

@Override
public void onMergeCompleted(OnGoingMerge merge) {
completed.incrementAndGet();
}

@Override
public void onMergeAborted(OnGoingMerge merge) {
aborted.incrementAndGet();
}
}

static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
.maybeCreateThreadPoolMergeExecutorService(
Expand Down
Loading