Skip to content

Commit 6aed71f

Browse files
authored
Add profile description labels for FORK sub plans (#128318)
1 parent cc5aa91 commit 6aed71f

File tree

2 files changed

+60
-14
lines changed
  • x-pack/plugin/esql/src

2 files changed

+60
-14
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.index.IndexRequest;
1111
import org.elasticsearch.action.support.WriteRequest;
1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.compute.operator.DriverProfile;
1314
import org.elasticsearch.xpack.esql.VerificationException;
1415
import org.elasticsearch.xpack.esql.parser.ParsingException;
1516
import org.junit.Before;
@@ -730,6 +731,35 @@ public void testOneSubQuery() {
730731
assertTrue(e.getMessage().contains("Fork requires at least two branches"));
731732
}
732733

734+
public void testProfile() {
735+
var query = """
736+
FROM test
737+
| FORK
738+
( WHERE content:"fox" | SORT id )
739+
( WHERE content:"dog" | SORT id )
740+
| SORT _fork, id
741+
| KEEP _fork, id, content
742+
""";
743+
744+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
745+
746+
request.pragmas(randomPragmas());
747+
request.query(query);
748+
request.profile(true);
749+
750+
try (var resp = run(request)) {
751+
EsqlQueryResponse.Profile profile = resp.profile();
752+
assertNotNull(profile);
753+
754+
List<String> descriptions = profile.drivers().stream().map(DriverProfile::description).sorted().toList();
755+
756+
assertEquals(
757+
List.of("data", "data", "main.final", "node_reduce", "node_reduce", "subplan-0.final", "subplan-1.final"),
758+
descriptions
759+
);
760+
}
761+
}
762+
733763
private void createAndPopulateIndex() {
734764
var indexName = "test";
735765
var client = client().admin().indices();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public void execute(
195195

196196
// we have no sub plans, so we can just execute the given plan
197197
if (subplans == null || subplans.size() == 0) {
198-
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, listener, null);
198+
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, null, listener, null);
199199
return;
200200
}
201201

@@ -220,7 +220,7 @@ public void execute(
220220
var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId));
221221
var computeContext = new ComputeContext(
222222
mainSessionId,
223-
"single",
223+
"main.final",
224224
LOCAL_CLUSTER,
225225
List.of(),
226226
configuration,
@@ -244,22 +244,33 @@ public void execute(
244244
) {
245245
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
246246

247-
for (PhysicalPlan subplan : subplans) {
247+
for (int i = 0; i < subplans.size(); i++) {
248+
var subplan = subplans.get(i);
248249
var childSessionId = newChildSession(sessionId);
249250
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
250251
// funnel sub plan pages into the main plan exchange source
251252
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
252253
var subPlanListener = localListener.acquireCompute();
253254

254-
executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
255-
exchangeSink.addCompletionListener(
256-
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
257-
);
258-
subPlanListener.onResponse(result.completionInfo());
259-
}, e -> {
260-
exchangeService.finishSinkHandler(childSessionId, e);
261-
subPlanListener.onFailure(e);
262-
}), () -> exchangeSink.createExchangeSink(() -> {}));
255+
executePlan(
256+
childSessionId,
257+
rootTask,
258+
subplan,
259+
configuration,
260+
foldContext,
261+
execInfo,
262+
"subplan-" + i,
263+
ActionListener.wrap(result -> {
264+
exchangeSink.addCompletionListener(
265+
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
266+
);
267+
subPlanListener.onResponse(result.completionInfo());
268+
}, e -> {
269+
exchangeService.finishSinkHandler(childSessionId, e);
270+
subPlanListener.onFailure(e);
271+
}),
272+
() -> exchangeSink.createExchangeSink(() -> {})
273+
);
263274
}
264275
}
265276
}
@@ -272,6 +283,7 @@ public void executePlan(
272283
Configuration configuration,
273284
FoldContext foldContext,
274285
EsqlExecutionInfo execInfo,
286+
String profileQualifier,
275287
ActionListener<Result> listener,
276288
Supplier<ExchangeSink> exchangeSinkSupplier
277289
) {
@@ -309,7 +321,7 @@ public void executePlan(
309321
}
310322
var computeContext = new ComputeContext(
311323
newChildSession(sessionId),
312-
"single",
324+
profileDescription(profileQualifier, "single"),
313325
LOCAL_CLUSTER,
314326
List.of(),
315327
configuration,
@@ -395,7 +407,7 @@ public void executePlan(
395407
rootTask,
396408
new ComputeContext(
397409
sessionId,
398-
"final",
410+
profileDescription(profileQualifier, "final"),
399411
LOCAL_CLUSTER,
400412
List.of(),
401413
configuration,
@@ -611,6 +623,10 @@ String newChildSession(String session) {
611623
return session + "/" + childSessionIdGenerator.incrementAndGet();
612624
}
613625

626+
String profileDescription(String qualifier, String label) {
627+
return qualifier == null ? label : qualifier + "." + label;
628+
}
629+
614630
Runnable cancelQueryOnFailure(CancellableTask task) {
615631
return new RunOnce(() -> {
616632
LOGGER.debug("cancelling ESQL task {} on failure", task);

0 commit comments

Comments
 (0)