Skip to content

Commit 6fffd73

Browse files
committed
Expose merge events and their memory usage estimate
1 parent ca095be commit 6fffd73

File tree

6 files changed

+76
-30
lines changed

6 files changed

+76
-30
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2870,7 +2870,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
28702870
IndexSettings indexSettings,
28712871
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
28722872
) {
2873-
super(shardId, indexSettings, threadPoolMergeExecutorService);
2873+
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
28742874
}
28752875

28762876
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.MergePolicy;
13+
14+
@FunctionalInterface
15+
public interface MergeMemoryEstimateProvider {
16+
17+
/**
18+
* Returns an estimate of the memory needed to perform a merge
19+
*/
20+
long estimateMergeMemoryBytes(MergePolicy.OneMerge merge);
21+
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1515
import org.elasticsearch.core.Nullable;
1616
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
17-
import org.elasticsearch.index.merge.OnGoingMerge;
1817
import org.elasticsearch.threadpool.ThreadPool;
1918

2019
import java.util.Comparator;
20+
import java.util.List;
2121
import java.util.Set;
22+
import java.util.concurrent.CopyOnWriteArrayList;
2223
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.PriorityBlockingQueue;
2425
import java.util.concurrent.RejectedExecutionException;
@@ -74,7 +75,7 @@ public class ThreadPoolMergeExecutorService {
7475
private final int concurrentMergesFloorLimitForThrottling;
7576
private final int concurrentMergesCeilLimitForThrottling;
7677

77-
private volatile MergeEventConsumer mergeEventConsumer;
78+
private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();
7879

7980
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
8081
ThreadPool threadPool,
@@ -130,13 +131,18 @@ boolean submitMergeTask(MergeTask mergeTask) {
130131
);
131132
}
132133
// then enqueue the merge task proper
133-
queuedMergeTasks.add(mergeTask);
134+
enqueueMergeTask(mergeTask);
134135
return true;
135136
}
136137
}
137138

138139
void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
140+
enqueueMergeTask(mergeTask);
141+
}
142+
143+
private void enqueueMergeTask(MergeTask mergeTask) {
139144
queuedMergeTasks.add(mergeTask);
145+
mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getEstimateMergeMemoryBytes()));
140146
}
141147

142148
public boolean allDone() {
@@ -204,6 +210,7 @@ private void runMergeTask(MergeTask mergeTask) {
204210
if (mergeTask.supportsIOThrottling()) {
205211
ioThrottledMergeTasksCount.decrementAndGet();
206212
}
213+
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
207214
}
208215
}
209216

@@ -216,6 +223,7 @@ private void abortMergeTask(MergeTask mergeTask) {
216223
if (mergeTask.supportsIOThrottling()) {
217224
ioThrottledMergeTasksCount.decrementAndGet();
218225
}
226+
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
219227
}
220228
}
221229

@@ -281,15 +289,8 @@ public boolean usingMaxTargetIORateBytesPerSec() {
281289
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
282290
}
283291

284-
public void registerMergeEventConsumer(MergeEventConsumer consumer) {
285-
assert this.mergeEventConsumer == null;
286-
this.mergeEventConsumer = consumer;
287-
}
288-
289-
public interface MergeEventConsumer {
290-
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);
291-
292-
void onMergeCompleted(OnGoingMerge merge);
292+
public void registerMergeEventListener(MergeEventListener consumer) {
293+
mergeEventListeners.add(consumer);
293294
}
294295

295296
// exposed for tests

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6565
private final AtomicLong doneMergeTaskCount = new AtomicLong();
6666
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
6767
private volatile boolean closed = false;
68-
private final MergeMemoryEstimator mergeMemoryEstimator;
68+
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
6969

7070
public ThreadPoolMergeScheduler(
7171
ShardId shardId,
7272
IndexSettings indexSettings,
73-
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
73+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
74+
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
7475
) {
7576
this.shardId = shardId;
7677
this.config = indexSettings.getMergeSchedulerConfig();
@@ -82,6 +83,7 @@ public ThreadPoolMergeScheduler(
8283
: Double.POSITIVE_INFINITY
8384
);
8485
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
86+
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
8587
}
8688

8789
@Override
@@ -177,11 +179,13 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
177179
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
178180
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
179181
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
182+
long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
180183
return new MergeTask(
181184
mergeSource,
182185
merge,
183186
isAutoThrottle && config.isAutoThrottle(),
184-
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
187+
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
188+
estimateMergeMemoryBytes
185189
);
186190
}
187191

@@ -313,14 +317,22 @@ class MergeTask implements Runnable {
313317
private final OnGoingMerge onGoingMerge;
314318
private final MergeRateLimiter rateLimiter;
315319
private final boolean supportsIOThrottling;
316-
317-
MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) {
320+
private final long estimateMergeMemoryBytes;
321+
322+
MergeTask(
323+
MergeSource mergeSource,
324+
MergePolicy.OneMerge merge,
325+
boolean supportsIOThrottling,
326+
String name,
327+
long estimateMergeMemoryBytes
328+
) {
318329
this.name = name;
319330
this.mergeStartTimeNS = new AtomicLong();
320331
this.mergeSource = mergeSource;
321332
this.onGoingMerge = new OnGoingMerge(merge);
322333
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
323334
this.supportsIOThrottling = supportsIOThrottling;
335+
this.estimateMergeMemoryBytes = estimateMergeMemoryBytes;
324336
}
325337

326338
Schedule schedule() {
@@ -451,7 +463,7 @@ long estimatedMergeSize() {
451463
}
452464

453465
public long getEstimateMergeMemoryBytes() {
454-
return mergeMemoryEstimator.estimateMergeMemoryBytes(onGoingMerge.getMerge());
466+
return estimateMergeMemoryBytes;
455467
}
456468

457469
public OnGoingMerge getOnGoingMerge() {

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,8 @@ public void testMergeTasksExecuteInSizeOrder() {
660660
}
661661
}
662662

663+
// todo: testMergeEventListeners
664+
663665
static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
664666
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
665667
.maybeCreateThreadPoolMergeExecutorService(

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

+21-11
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public void testMergesExecuteInSizeOrder() throws IOException {
6161
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
6262
new ShardId("index", "_na_", 1),
6363
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
64-
threadPoolMergeExecutorService
64+
threadPoolMergeExecutorService,
65+
merge -> 0
6566
)
6667
) {
6768
List<OneMerge> executedMergesList = new ArrayList<>();
@@ -103,7 +104,8 @@ public void testSimpleMergeTaskBacklogging() {
103104
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
104105
new ShardId("index", "_na_", 1),
105106
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
106-
threadPoolMergeExecutorService
107+
threadPoolMergeExecutorService,
108+
merge -> 0
107109
);
108110
// more merge tasks than merge threads
109111
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
@@ -136,7 +138,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
136138
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
137139
new ShardId("index", "_na_", 1),
138140
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
139-
threadPoolMergeExecutorService
141+
threadPoolMergeExecutorService,
142+
merge -> 0
140143
);
141144
// sort backlogged merges by size
142145
PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize));
@@ -347,7 +350,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception
347350
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
348351
new ShardId("index", "_na_", 1),
349352
IndexSettingsModule.newIndexSettings("index", settings),
350-
threadPoolMergeExecutorService
353+
threadPoolMergeExecutorService,
354+
merge -> 0
351355
)
352356
) {
353357
MergeSource mergeSource = mock(MergeSource.class);
@@ -420,7 +424,8 @@ public void testMergesRunConcurrently() throws Exception {
420424
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
421425
new ShardId("index", "_na_", 1),
422426
IndexSettingsModule.newIndexSettings("index", settings),
423-
threadPoolMergeExecutorService
427+
threadPoolMergeExecutorService,
428+
merge -> 0
424429
)
425430
) {
426431
// at least 1 extra merge than there are concurrently allowed
@@ -504,7 +509,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
504509
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
505510
new ShardId("index", "_na_", 1),
506511
IndexSettingsModule.newIndexSettings("index", settings),
507-
threadPoolMergeExecutorService
512+
threadPoolMergeExecutorService,
513+
merge -> 0
508514
)
509515
) {
510516
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
@@ -576,7 +582,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
576582
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
577583
new ShardId("index", "_na_", 1),
578584
indexSettings,
579-
threadPoolMergeExecutorService
585+
threadPoolMergeExecutorService,
586+
merge -> 0
580587
)
581588
) {
582589
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@@ -605,7 +612,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
605612
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
606613
new ShardId("index", "_na_", 1),
607614
indexSettings,
608-
threadPoolMergeExecutorService
615+
threadPoolMergeExecutorService,
616+
merge -> 0
609617
)
610618
) {
611619
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@@ -621,7 +629,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
621629
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
622630
new ShardId("index", "_na_", 1),
623631
indexSettings,
624-
threadPoolMergeExecutorService
632+
threadPoolMergeExecutorService,
633+
merge -> 0
625634
)
626635
) {
627636
// merge submitted upon closing
@@ -637,7 +646,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
637646
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
638647
new ShardId("index", "_na_", 1),
639648
indexSettings,
640-
threadPoolMergeExecutorService
649+
threadPoolMergeExecutorService,
650+
merge -> 0
641651
)
642652
) {
643653
// merge submitted upon closing
@@ -668,7 +678,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {
668678
IndexSettings indexSettings,
669679
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
670680
) {
671-
super(shardId, indexSettings, threadPoolMergeExecutorService);
681+
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0);
672682
}
673683

674684
@Override

0 commit comments

Comments
 (0)