From 37ca2986963d88cb9b6ca85af5a12739bb9e0127 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 22 Apr 2025 11:24:20 -0400 Subject: [PATCH] ESQL: Add fields to detailed query fetch api This adds `documents_found` and `values_loaded` to the ESQL query get API. Sadly, this is only the sum of the of statistics from currently running drivers. Completed drivers are forgotten. That information is available, but it's not plumbed into a place where we can get it. That's next! --- .../esql/action/EsqlListQueriesActionIT.java | 2 ++ .../esql/plugin/EsqlGetQueryResponse.java | 4 +++ .../plugin/TransportEsqlGetQueryAction.java | 34 +++++++++++++++---- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java index 7b337d4415cd6..590f7efef8037 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java @@ -67,6 +67,8 @@ public void testRunningQueries() throws Exception { jsonEntityToMap(getQueryResponse.getEntity()), basicMatcher.entry("coordinating_node", isA(String.class)) .entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class)))) + .entry("documents_found", IntOrLongMatcher.isIntOrLong()) + .entry("values_loaded", IntOrLongMatcher.isIntOrLong()) ); } finally { if (id != null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java index 1a4b6538d1a2a..b8679bb95c463 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java @@ -22,6 +22,8 @@ public record DetailedQuery( TaskId id, long startTimeMillis, long runningTimeNanos, + long documentsFound, + long valuesLoaded, String query, String coordinatingNode, List dataNodes @@ -33,6 +35,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("node", id.getNodeId()); builder.field("start_time_millis", startTimeMillis); builder.field("running_time_nanos", runningTimeNanos); + builder.field("documents_found", documentsFound); + builder.field("values_loaded", valuesLoaded); builder.field("query", query); builder.field("coordinating_node", coordinatingNode); builder.field("data_nodes", dataNodes); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java index 70175931ea633..038f9d3df0c6c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -28,6 +29,9 @@ import org.elasticsearch.xpack.esql.action.EsqlGetQueryRequest; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import java.util.Set; +import java.util.TreeSet; + import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN; public class TransportEsqlGetQueryAction extends HandledTransportAction { @@ -85,14 +89,30 @@ public void onFailure(Exception e) { ); } - private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo task, ListTasksResponse response) { + private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo main, ListTasksResponse sub) { + String query = main.description(); + String coordinatingNode = main.node(); + + // TODO include completed drivers in documentsFound and valuesLoaded + long documentsFound = 0; + long valuesLoaded = 0; + Set dataNodes = new TreeSet<>(); + for (TaskInfo info : sub.getTasks()) { + DriverStatus status = (DriverStatus) info.status(); + documentsFound += status.documentsFound(); + valuesLoaded += status.valuesLoaded(); + dataNodes.add(info.node()); + } + return new EsqlGetQueryResponse.DetailedQuery( - task.taskId(), - task.startTime(), - task.runningTimeNanos(), - task.description(), // Query - task.node(), // Coordinating node - response.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes + main.taskId(), + main.startTime(), + main.runningTimeNanos(), + documentsFound, + valuesLoaded, + query, + coordinatingNode, + sub.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes ); } }