|
18 | 18 | import org.elasticsearch.action.support.HandledTransportAction;
|
19 | 19 | import org.elasticsearch.client.internal.node.NodeClient;
|
20 | 20 | import org.elasticsearch.common.util.concurrent.EsExecutors;
|
| 21 | +import org.elasticsearch.compute.operator.DriverStatus; |
21 | 22 | import org.elasticsearch.compute.operator.DriverTaskRunner;
|
22 | 23 | import org.elasticsearch.injection.guice.Inject;
|
23 | 24 | import org.elasticsearch.tasks.Task;
|
|
28 | 29 | import org.elasticsearch.xpack.esql.action.EsqlGetQueryRequest;
|
29 | 30 | import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
|
30 | 31 |
|
| 32 | +import java.util.Set; |
| 33 | +import java.util.TreeSet; |
| 34 | + |
31 | 35 | import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN;
|
32 | 36 |
|
33 | 37 | public class TransportEsqlGetQueryAction extends HandledTransportAction<EsqlGetQueryRequest, EsqlGetQueryResponse> {
|
@@ -85,14 +89,30 @@ public void onFailure(Exception e) {
|
85 | 89 | );
|
86 | 90 | }
|
87 | 91 |
|
88 |
| - private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo task, ListTasksResponse response) { |
| 92 | + private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo main, ListTasksResponse sub) { |
| 93 | + String query = main.description(); |
| 94 | + String coordinatingNode = main.node(); |
| 95 | + |
| 96 | + // TODO include completed drivers in documentsFound and valuesLoaded |
| 97 | + long documentsFound = 0; |
| 98 | + long valuesLoaded = 0; |
| 99 | + Set<String> dataNodes = new TreeSet<>(); |
| 100 | + for (TaskInfo info : sub.getTasks()) { |
| 101 | + DriverStatus status = (DriverStatus) info.status(); |
| 102 | + documentsFound += status.documentsFound(); |
| 103 | + valuesLoaded += status.valuesLoaded(); |
| 104 | + dataNodes.add(info.node()); |
| 105 | + } |
| 106 | + |
89 | 107 | return new EsqlGetQueryResponse.DetailedQuery(
|
90 |
| - task.taskId(), |
91 |
| - task.startTime(), |
92 |
| - task.runningTimeNanos(), |
93 |
| - task.description(), // Query |
94 |
| - task.node(), // Coordinating node |
95 |
| - response.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes |
| 108 | + main.taskId(), |
| 109 | + main.startTime(), |
| 110 | + main.runningTimeNanos(), |
| 111 | + documentsFound, |
| 112 | + valuesLoaded, |
| 113 | + query, |
| 114 | + coordinatingNode, |
| 115 | + sub.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes |
96 | 116 | );
|
97 | 117 | }
|
98 | 118 | }
|
0 commit comments