Skip to content

Commit 73b0a60

Browse files
authored
Revert "Dispatch ingest work to coordination thread pool (#129820)" (#129949)
This reverts commit 53dae7a.
1 parent d16271b commit 73b0a60

File tree

8 files changed

+50
-84
lines changed

8 files changed

+50
-84
lines changed

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
410410

411411
private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) {
412412
assertThat(blockingLatch.getCount(), greaterThan(0L));
413-
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
413+
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
414414
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
415415
expectThrows(EsRejectedExecutionException.class, () -> {
416416
// noinspection InfiniteLoopStatement

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
6969
private final IngestActionForwarder ingestForwarder;
7070
protected final LongSupplier relativeTimeNanosProvider;
7171
protected final Executor coordinationExecutor;
72-
protected final Executor systemCoordinationExecutor;
72+
protected final Executor writeExecutor;
73+
protected final Executor systemWriteExecutor;
7374
private final ActionType<BulkResponse> bulkAction;
7475

7576
public TransportAbstractBulkAction(
@@ -93,7 +94,8 @@ public TransportAbstractBulkAction(
9394
this.systemIndices = systemIndices;
9495
this.projectResolver = projectResolver;
9596
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
96-
this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
97+
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
98+
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
9799
this.ingestForwarder = new IngestActionForwarder(transportService);
98100
clusterService.addStateApplier(this.ingestForwarder);
99101
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
@@ -132,14 +134,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
132134
}
133135
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
134136
// Use coordinationExecutor for dispatching coordination tasks
135-
final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor;
136-
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
137+
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
137138
}
138139

139140
private void ensureClusterStateThenForkAndExecute(
140141
Task task,
141142
BulkRequest bulkRequest,
142143
Executor executor,
144+
boolean isOnlySystem,
143145
ActionListener<BulkResponse> releasingListener
144146
) {
145147
final ClusterState initialState = clusterService.state();
@@ -161,7 +163,7 @@ private void ensureClusterStateThenForkAndExecute(
161163
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
162164
@Override
163165
public void onNewClusterState(ClusterState state) {
164-
forkAndExecute(task, bulkRequest, executor, releasingListener);
166+
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
165167
}
166168

167169
@Override
@@ -175,21 +177,32 @@ public void onTimeout(TimeValue timeout) {
175177
}
176178
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
177179
} else {
178-
forkAndExecute(task, bulkRequest, executor, releasingListener);
180+
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
179181
}
180182
}
181183

182-
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
184+
private void forkAndExecute(
185+
Task task,
186+
BulkRequest bulkRequest,
187+
Executor executor,
188+
boolean isOnlySystem,
189+
ActionListener<BulkResponse> releasingListener
190+
) {
183191
executor.execute(new ActionRunnable<>(releasingListener) {
184192
@Override
185193
protected void doRun() throws IOException {
186-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
194+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
187195
}
188196
});
189197
}
190198

191-
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
192-
throws IOException {
199+
private boolean applyPipelines(
200+
Task task,
201+
BulkRequest bulkRequest,
202+
Executor executor,
203+
boolean isOnlySystem,
204+
ActionListener<BulkResponse> listener
205+
) throws IOException {
193206
boolean hasIndexRequestsWithPipelines = false;
194207
ClusterState state = clusterService.state();
195208
ProjectId projectId = projectResolver.getProjectId();
@@ -278,7 +291,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
278291
assert arePipelinesResolved : bulkRequest;
279292
}
280293
if (clusterService.localNode().isIngestNode()) {
281-
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
294+
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
282295
} else {
283296
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
284297
}
@@ -292,6 +305,7 @@ private void processBulkIndexIngestRequest(
292305
Task task,
293306
BulkRequest original,
294307
Executor executor,
308+
boolean isOnlySystem,
295309
ProjectMetadata metadata,
296310
ActionListener<BulkResponse> listener
297311
) {
@@ -325,7 +339,7 @@ private void processBulkIndexIngestRequest(
325339
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
326340
@Override
327341
protected void doRun() throws IOException {
328-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
342+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
329343
}
330344

331345
@Override
@@ -348,7 +362,8 @@ public boolean isForceExecution() {
348362
}
349363
}
350364
},
351-
executor
365+
// Use the appropriate write executor for actual ingest processing
366+
isOnlySystem ? systemWriteExecutor : writeExecutor
352367
);
353368
}
354369

@@ -404,10 +419,11 @@ private void applyPipelinesAndDoInternalExecute(
404419
Task task,
405420
BulkRequest bulkRequest,
406421
Executor executor,
422+
boolean isOnlySystem,
407423
ActionListener<BulkResponse> listener
408424
) throws IOException {
409425
final long relativeStartTimeNanos = relativeTimeNanos();
410-
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
426+
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
411427
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
412428
}
413429
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -198,17 +198,6 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
198198
true
199199
)
200200
);
201-
result.put(
202-
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
203-
new FixedExecutorBuilder(
204-
settings,
205-
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
206-
halfProcMaxAt5,
207-
1000,
208-
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
209-
true
210-
)
211-
);
212201
result.put(
213202
ThreadPool.Names.SYSTEM_CRITICAL_READ,
214203
new FixedExecutorBuilder(

server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ protected static String settingsKey(final String prefix, final String key) {
4242
protected static int applyHardSizeLimit(final Settings settings, final String name) {
4343
if (name.equals("bulk")
4444
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
45-
|| name.equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)
4645
|| name.equals(ThreadPool.Names.WRITE)
4746
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
4847
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ public static class Names {
142142
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
143143
public static final String SYSTEM_READ = "system_read";
144144
public static final String SYSTEM_WRITE = "system_write";
145-
public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination";
146145
public static final String SYSTEM_CRITICAL_READ = "system_critical_read";
147146
public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write";
148147
}
@@ -188,8 +187,8 @@ public static ThreadPoolType fromType(String type) {
188187
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
189188
entry(Names.GET, ThreadPoolType.FIXED),
190189
entry(Names.ANALYZE, ThreadPoolType.FIXED),
191-
entry(Names.WRITE, ThreadPoolType.FIXED),
192190
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
191+
entry(Names.WRITE, ThreadPoolType.FIXED),
193192
entry(Names.SEARCH, ThreadPoolType.FIXED),
194193
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),
195194
entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED),
@@ -205,7 +204,6 @@ public static ThreadPoolType fromType(String type) {
205204
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
206205
entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
207206
entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED),
208-
entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED),
209207
entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED),
210208
entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED)
211209
);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
109109
private FeatureService mockFeatureService;
110110

111111
private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
112-
private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination");
112+
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
113+
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");
113114

114115
private final ProjectId projectId = randomProjectIdOrDefault();
115116

@@ -295,7 +296,8 @@ public void setupAction() {
295296
// initialize captors, which must be members to use @Capture because of generics
296297
threadPool = mock(ThreadPool.class);
297298
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
298-
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor);
299+
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
300+
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
299301
MockitoAnnotations.openMocks(this);
300302
// setup services that will be called by action
301303
transportService = mock(TransportService.class);
@@ -426,7 +428,7 @@ public void testIngestLocal() throws Exception {
426428
redirectHandler.capture(),
427429
failureHandler.capture(),
428430
completionHandler.capture(),
429-
same(writeCoordinationExecutor)
431+
same(writeExecutor)
430432
);
431433
completionHandler.getValue().accept(null, exception);
432434
assertTrue(failureCalled.get());
@@ -477,7 +479,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
477479
any(),
478480
failureHandler.capture(),
479481
completionHandler.capture(),
480-
same(writeCoordinationExecutor)
482+
same(writeExecutor)
481483
);
482484
completionHandler.getValue().accept(null, exception);
483485
assertTrue(failureCalled.get());
@@ -526,7 +528,7 @@ public void testIngestSystemLocal() throws Exception {
526528
any(),
527529
failureHandler.capture(),
528530
completionHandler.capture(),
529-
same(systemWriteCoordinationExecutor)
531+
same(systemWriteExecutor)
530532
);
531533
completionHandler.getValue().accept(null, exception);
532534
assertTrue(failureCalled.get());
@@ -687,7 +689,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
687689
any(),
688690
failureHandler.capture(),
689691
completionHandler.capture(),
690-
same(writeCoordinationExecutor)
692+
same(writeExecutor)
691693
);
692694
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
693695
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
@@ -738,7 +740,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
738740
any(),
739741
failureHandler.capture(),
740742
completionHandler.capture(),
741-
same(writeCoordinationExecutor)
743+
same(writeExecutor)
742744
);
743745
completionHandler.getValue().accept(null, exception);
744746
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
@@ -832,7 +834,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
832834
any(),
833835
failureHandler.capture(),
834836
completionHandler.capture(),
835-
same(writeCoordinationExecutor)
837+
same(writeExecutor)
836838
);
837839
}
838840

@@ -873,7 +875,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
873875
any(),
874876
failureHandler.capture(),
875877
completionHandler.capture(),
876-
same(writeCoordinationExecutor)
878+
same(writeExecutor)
877879
);
878880
}
879881

@@ -903,7 +905,7 @@ public void testIngestCallbackExceptionHandled() throws Exception {
903905
any(),
904906
failureHandler.capture(),
905907
completionHandler.capture(),
906-
same(writeCoordinationExecutor)
908+
same(writeExecutor)
907909
);
908910
indexRequest1.autoGenerateId();
909911
completionHandler.getValue().accept(Thread.currentThread(), null);
@@ -943,7 +945,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
943945
any(),
944946
failureHandler.capture(),
945947
completionHandler.capture(),
946-
same(writeCoordinationExecutor)
948+
same(writeExecutor)
947949
);
948950
assertEquals(indexRequest.getPipeline(), "default_pipeline");
949951
completionHandler.getValue().accept(null, exception);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 5 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.index.IndexVersions;
6262
import org.elasticsearch.index.IndexingPressure;
6363
import org.elasticsearch.index.VersionType;
64+
import org.elasticsearch.indices.EmptySystemIndices;
6465
import org.elasticsearch.indices.SystemIndexDescriptorUtils;
6566
import org.elasticsearch.indices.SystemIndices;
6667
import org.elasticsearch.test.ESTestCase;
@@ -125,15 +126,7 @@ class TestTransportBulkAction extends TransportBulkAction {
125126
new ActionFilters(Collections.emptySet()),
126127
new Resolver(),
127128
new IndexingPressure(Settings.EMPTY),
128-
new SystemIndices(
129-
List.of(
130-
new SystemIndices.Feature(
131-
"plugin",
132-
"test feature",
133-
List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", ""))
134-
)
135-
)
136-
),
129+
EmptySystemIndices.INSTANCE,
137130
new ProjectResolver() {
138131
@Override
139132
public <E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E {
@@ -393,7 +386,7 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
393386
});
394387
}
395388

396-
public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
389+
public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
397390
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
398391
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
399392
ThreadPoolStats.Stats stats = threadPool.stats()
@@ -408,7 +401,8 @@ public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
408401

409402
assertBusy(() -> {
410403
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
411-
// index is created.
404+
// index
405+
// is created.
412406
assertThat(
413407
threadPool.stats()
414408
.stats()
@@ -422,37 +416,6 @@ public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
422416
});
423417
}
424418

425-
public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception {
426-
BulkRequest bulkRequest = new BulkRequest().add(
427-
new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap())
428-
);
429-
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
430-
ThreadPoolStats.Stats stats = threadPool.stats()
431-
.stats()
432-
.stream()
433-
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
434-
.findAny()
435-
.get();
436-
assertThat(stats.completed(), equalTo(0L));
437-
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
438-
future.actionGet();
439-
440-
assertBusy(() -> {
441-
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
442-
// index is created.
443-
assertThat(
444-
threadPool.stats()
445-
.stats()
446-
.stream()
447-
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
448-
.findAny()
449-
.get()
450-
.completed(),
451-
equalTo(2L)
452-
);
453-
});
454-
}
455-
456419
public void testRejectCoordination() {
457420
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
458421

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
7777
ThreadPool.Names.WRITE,
7878
ThreadPool.Names.WRITE_COORDINATION,
7979
ThreadPool.Names.SYSTEM_WRITE,
80-
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
8180
ThreadPool.Names.SEARCH,
8281
ThreadPool.Names.MANAGEMENT
8382
);

0 commit comments

Comments
 (0)