Skip to content

Revert "Dispatch ingest work to coordination thread pool (#129820)" #129949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {

private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) {
assertThat(blockingLatch.getCount(), greaterThan(0L));
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
expectThrows(EsRejectedExecutionException.class, () -> {
// noinspection InfiniteLoopStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
private final IngestActionForwarder ingestForwarder;
protected final LongSupplier relativeTimeNanosProvider;
protected final Executor coordinationExecutor;
protected final Executor systemCoordinationExecutor;
protected final Executor writeExecutor;
protected final Executor systemWriteExecutor;
private final ActionType<BulkResponse> bulkAction;

public TransportAbstractBulkAction(
Expand All @@ -93,7 +94,8 @@ public TransportAbstractBulkAction(
this.systemIndices = systemIndices;
this.projectResolver = projectResolver;
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
this.ingestForwarder = new IngestActionForwarder(transportService);
clusterService.addStateApplier(this.ingestForwarder);
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
Expand Down Expand Up @@ -132,14 +134,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
// Use coordinationExecutor for dispatching coordination tasks
final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
}

private void ensureClusterStateThenForkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
final ClusterState initialState = clusterService.state();
Expand All @@ -161,7 +163,7 @@ private void ensureClusterStateThenForkAndExecute(
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, executor, releasingListener);
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
}

@Override
Expand All @@ -175,21 +177,32 @@ public void onTimeout(TimeValue timeout) {
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
} else {
forkAndExecute(task, bulkRequest, executor, releasingListener);
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
}
}

private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
private void forkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
executor.execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
}
});
}

private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
private boolean applyPipelines(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
boolean hasIndexRequestsWithPipelines = false;
ClusterState state = clusterService.state();
ProjectId projectId = projectResolver.getProjectId();
Expand Down Expand Up @@ -278,7 +291,7 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
} else {
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
}
Expand All @@ -292,6 +305,7 @@ private void processBulkIndexIngestRequest(
Task task,
BulkRequest original,
Executor executor,
boolean isOnlySystem,
ProjectMetadata metadata,
ActionListener<BulkResponse> listener
) {
Expand Down Expand Up @@ -325,7 +339,7 @@ private void processBulkIndexIngestRequest(
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
}

@Override
Expand All @@ -348,7 +362,8 @@ public boolean isForceExecution() {
}
}
},
executor
// Use the appropriate write executor for actual ingest processing
isOnlySystem ? systemWriteExecutor : writeExecutor
);
}

Expand Down Expand Up @@ -404,10 +419,11 @@ private void applyPipelinesAndDoInternalExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,6 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
true
)
);
result.put(
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
new FixedExecutorBuilder(
settings,
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
halfProcMaxAt5,
1000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
true
)
);
result.put(
ThreadPool.Names.SYSTEM_CRITICAL_READ,
new FixedExecutorBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ protected static String settingsKey(final String prefix, final String key) {
protected static int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals("bulk")
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
|| name.equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)
|| name.equals(ThreadPool.Names.WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public static class Names {
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String SYSTEM_READ = "system_read";
public static final String SYSTEM_WRITE = "system_write";
public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination";
public static final String SYSTEM_CRITICAL_READ = "system_critical_read";
public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write";
}
Expand Down Expand Up @@ -188,8 +187,8 @@ public static ThreadPoolType fromType(String type) {
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED),
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED),
entry(Names.SEARCH, ThreadPoolType.FIXED),
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),
entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED),
Expand All @@ -205,7 +204,6 @@ public static ThreadPoolType fromType(String type) {
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED),
entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED),
entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
private FeatureService mockFeatureService;

private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination");
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");

private final ProjectId projectId = randomProjectIdOrDefault();

Expand Down Expand Up @@ -295,7 +296,8 @@ public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
threadPool = mock(ThreadPool.class);
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor);
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
MockitoAnnotations.openMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
Expand Down Expand Up @@ -426,7 +428,7 @@ public void testIngestLocal() throws Exception {
redirectHandler.capture(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
Expand Down Expand Up @@ -477,7 +479,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
Expand Down Expand Up @@ -526,7 +528,7 @@ public void testIngestSystemLocal() throws Exception {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(systemWriteCoordinationExecutor)
same(systemWriteExecutor)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
Expand Down Expand Up @@ -687,7 +689,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
Expand Down Expand Up @@ -738,7 +740,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
completionHandler.getValue().accept(null, exception);
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
Expand Down Expand Up @@ -832,7 +834,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
}

Expand Down Expand Up @@ -873,7 +875,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
}

Expand Down Expand Up @@ -903,7 +905,7 @@ public void testIngestCallbackExceptionHandled() throws Exception {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
indexRequest1.autoGenerateId();
completionHandler.getValue().accept(Thread.currentThread(), null);
Expand Down Expand Up @@ -943,7 +945,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
same(writeExecutor)
);
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(null, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.indices.SystemIndexDescriptorUtils;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -125,15 +126,7 @@ class TestTransportBulkAction extends TransportBulkAction {
new ActionFilters(Collections.emptySet()),
new Resolver(),
new IndexingPressure(Settings.EMPTY),
new SystemIndices(
List.of(
new SystemIndices.Feature(
"plugin",
"test feature",
List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", ""))
)
)
),
EmptySystemIndices.INSTANCE,
new ProjectResolver() {
@Override
public <E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E {
Expand Down Expand Up @@ -393,7 +386,7 @@ private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
});
}

public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ThreadPoolStats.Stats stats = threadPool.stats()
Expand All @@ -408,7 +401,8 @@ public void testDispatchesToWriteCoordinationThreadPool() throws Exception {

assertBusy(() -> {
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
// index is created.
// index
// is created.
assertThat(
threadPool.stats()
.stats()
Expand All @@ -422,37 +416,6 @@ public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
});
}

public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(
new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap())
);
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ThreadPoolStats.Stats stats = threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
.findAny()
.get();
assertThat(stats.completed(), equalTo(0L));
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
future.actionGet();

assertBusy(() -> {
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
// index is created.
assertThat(
threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
.findAny()
.get()
.completed(),
equalTo(2L)
);
});
}

public void testRejectCoordination() {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
ThreadPool.Names.WRITE,
ThreadPool.Names.WRITE_COORDINATION,
ThreadPool.Names.SYSTEM_WRITE,
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
ThreadPool.Names.SEARCH,
ThreadPool.Names.MANAGEMENT
);
Expand Down