-
Notifications
You must be signed in to change notification settings - Fork 25.3k
ESQL: List/get query API #124832
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: List/get query API #124832
Changes from 12 commits
d6b0a92
d961e27
55e7625
8ad4d6d
fd08751
f1aab5d
af2b5ee
1d95cbc
cb4ec47
2fa0085
ff0fe90
758319a
894dad6
30f5ef1
9bcdb10
6686702
9847add
1d0d15c
9fe9243
f786222
6134a8e
789362f
0cedb9a
09dc300
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pr: 124832 | ||
summary: List/get query API | ||
area: ES|QL | ||
type: feature | ||
issues: | ||
- 124827 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.test; | ||
|
||
import org.hamcrest.BaseMatcher; | ||
import org.hamcrest.Description; | ||
import org.hamcrest.Matcher; | ||
|
||
import static org.hamcrest.Matchers.anyOf; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.isA; | ||
|
||
/** | ||
* A type-agnostic way of comparing integer values, not caring if it's a long or an integer. | ||
*/ | ||
public abstract sealed class IntegerMatcher<T> extends BaseMatcher<T> { | ||
public static IntegerMatcher<Integer> matches(int expected) { | ||
return new IntMatcher(expected); | ||
} | ||
|
||
public static IntegerMatcher<Long> matches(long expected) { | ||
return new LongMatcher(expected); | ||
} | ||
|
||
private static final class IntMatcher extends IntegerMatcher<Integer> { | ||
private final int expected; | ||
|
||
private IntMatcher(int expected) { | ||
this.expected = expected; | ||
} | ||
|
||
@Override | ||
public boolean matches(Object o) { | ||
return switch (o) { | ||
case Integer i -> expected == i; | ||
case Long l -> expected == l; | ||
default -> false; | ||
}; | ||
} | ||
|
||
@Override | ||
public void describeTo(Description description) { | ||
equalTo(expected).describeTo(description); | ||
} | ||
} | ||
|
||
private static final class LongMatcher extends IntegerMatcher<Long> { | ||
private final long expected; | ||
|
||
LongMatcher(long expected) { | ||
this.expected = expected; | ||
} | ||
|
||
@Override | ||
public boolean matches(Object o) { | ||
return switch (o) { | ||
case Integer i -> expected == i; | ||
case Long l -> expected == l; | ||
default -> false; | ||
}; | ||
} | ||
|
||
@Override | ||
public void describeTo(Description description) { | ||
equalTo(expected).describeTo(description); | ||
} | ||
} | ||
|
||
public static Matcher<Object> isIntOrLong() { | ||
return anyOf(isA(Integer.class), isA(Long.class)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import org.elasticsearch.client.RequestOptions; | ||
import org.elasticsearch.client.Response; | ||
import org.elasticsearch.client.ResponseException; | ||
import org.elasticsearch.client.ResponseListener; | ||
import org.elasticsearch.client.WarningsHandler; | ||
import org.elasticsearch.common.bytes.BytesArray; | ||
import org.elasticsearch.common.io.Streams; | ||
|
@@ -26,6 +27,7 @@ | |
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.logging.LogManager; | ||
import org.elasticsearch.logging.Logger; | ||
import org.elasticsearch.tasks.TaskId; | ||
import org.elasticsearch.test.ListMatcher; | ||
import org.elasticsearch.test.rest.ESRestTestCase; | ||
import org.elasticsearch.xcontent.ToXContent; | ||
|
@@ -44,6 +46,7 @@ | |
import java.io.InputStreamReader; | ||
import java.io.OutputStream; | ||
import java.nio.charset.StandardCharsets; | ||
import java.time.Duration; | ||
import java.time.ZoneId; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
|
@@ -70,6 +73,7 @@ | |
import static org.hamcrest.Matchers.emptyOrNullString; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
import static org.hamcrest.Matchers.hasKey; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.not; | ||
import static org.hamcrest.Matchers.nullValue; | ||
|
@@ -1345,6 +1349,56 @@ public void testAsyncGetWithoutContentType() throws IOException { | |
|
||
} | ||
|
||
public void testListApi_noRunningQueries_returnsAnObject() throws Exception { | ||
Request request = prepareListQueriesRequest(); | ||
Response response = performRequest(request); | ||
assertThat(entityToMap(response.getEntity(), XContentType.JSON), is(Map.of("queries", Map.of()))); | ||
} | ||
|
||
public void testListApi_runningQuery_returnsQueriesObject() throws Exception { | ||
bulkLoadTestData(1); | ||
String query = fromIndex() + " | keep keyword, integer | where delay(10s) | limit 100 "; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you move this one to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
var builder = requestObjectBuilder().query(query); | ||
Request request = prepareRequest(SYNC); | ||
String mediaType = attachBody(builder.build(), request); | ||
RequestOptions.Builder options = request.getOptions().toBuilder(); | ||
options.addHeader("Content-Type", mediaType); | ||
options.addHeader("Accept", mediaType); | ||
request.setOptions(options); | ||
client().performRequestAsync(request, new ResponseListener() { | ||
@Override | ||
public void onSuccess(Response response) {} | ||
|
||
@Override | ||
public void onFailure(Exception exception) {} | ||
}); | ||
Thread.sleep(Duration.ofSeconds(5)); | ||
Response response = performRequest(prepareListQueriesRequest()); | ||
@SuppressWarnings("unchecked") | ||
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue( | ||
entityToMap(response.getEntity(), XContentType.JSON).values() | ||
); | ||
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet())); | ||
var queryFromListResult = EsqlTestUtils.singleValue(listResult.values()); | ||
assertThat(queryFromListResult.get("id"), is((int) taskId.getId())); | ||
assertThat(queryFromListResult.get("node"), is(taskId.getNodeId())); | ||
assertThat(queryFromListResult.get("query"), is(query)); | ||
assertThat(queryFromListResult, hasKey("start_time_millis")); | ||
assertThat(queryFromListResult, hasKey("running_time_nanos")); | ||
|
||
response = performRequest(prepareGetQueryRequest(taskId)); | ||
@SuppressWarnings("unchecked") | ||
Map<String, Object> getQueryResult = entityToMap(response.getEntity(), XContentType.JSON); | ||
assertThat(getQueryResult.get("id"), is((int) taskId.getId())); | ||
assertThat(getQueryResult.get("node"), is(taskId.getNodeId())); | ||
assertThat(getQueryResult.get("query"), is(query)); | ||
assertThat(getQueryResult.get("start_time_millis"), is(queryFromListResult.get("start_time_millis"))); | ||
assertThat(getQueryResult, hasKey("running_time_nanos")); | ||
assertThat(getQueryResult, hasKey("coordinating_node")); | ||
assertThat(getQueryResult, hasKey("data_nodes")); | ||
Thread.sleep(Duration.ofSeconds(5)); | ||
} | ||
|
||
protected static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException { | ||
requestObject.build(); | ||
Request request = prepareRequest(mode); | ||
|
@@ -1515,21 +1569,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma | |
} | ||
|
||
private static Request prepareRequest(Mode mode) { | ||
Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : "")); | ||
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. | ||
request.addParameter("pretty", "true"); // Improves error reporting readability | ||
return request; | ||
return finishRequest(new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""))); | ||
} | ||
|
||
private static Request prepareAsyncGetRequest(String id) { | ||
Request request = new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s"); | ||
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. | ||
request.addParameter("pretty", "true"); // Improves error reporting readability | ||
return request; | ||
return finishRequest(new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s")); | ||
} | ||
|
||
private static Request prepareAsyncDeleteRequest(String id) { | ||
Request request = new Request("DELETE", "/_query/async/" + id); | ||
return finishRequest(new Request("DELETE", "/_query/async/" + id)); | ||
} | ||
|
||
private static Request finishRequest(Request request) { | ||
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. | ||
request.addParameter("pretty", "true"); // Improves error reporting readability | ||
return request; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,7 +55,7 @@ public void ensureExchangesAreReleased() throws Exception { | |
ExchangeService exchangeService = esqlQueryAction.exchangeService(); | ||
assertBusy(() -> { | ||
if (exchangeService.lifecycleState() == Lifecycle.State.STARTED) { | ||
assertTrue("Leftover exchanges " + exchangeService + " on node " + node, exchangeService.isEmpty()); | ||
assertTrue("Leftover exchanges " + exchangeService + " on taskId " + node, exchangeService.isEmpty()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is on the node, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. At some point I renamed a field called node and intellij was kind enough to replace all "node" words with "taskId" in the entire repo. I thought I caught all of them but I guess a few slipped through. |
||
} | ||
}); | ||
} | ||
|
@@ -98,7 +98,7 @@ public void ensureBlocksReleased() { | |
) | ||
.toList() | ||
); | ||
assertThat("Request breaker not reset to 0 on node: " + node, reqBreaker.getUsed(), equalTo(0L)); | ||
assertThat("Request breaker not reset to 0 on taskId: " + node, reqBreaker.getUsed(), equalTo(0L)); | ||
}); | ||
} catch (Exception e) { | ||
throw new RuntimeException("failed waiting for breakers to clear", e); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.apache.http.HttpEntity; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.common.xcontent.XContentHelper; | ||
import org.elasticsearch.tasks.TaskId; | ||
import org.elasticsearch.test.IntegerMatcher; | ||
import org.elasticsearch.test.MapMatcher; | ||
import org.elasticsearch.xcontent.XContentType; | ||
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; | ||
import org.elasticsearch.xpack.esql.EsqlTestUtils; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.elasticsearch.core.TimeValue.timeValueSeconds; | ||
import static org.hamcrest.Matchers.allOf; | ||
import static org.hamcrest.Matchers.everyItem; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.isA; | ||
|
||
/** | ||
* Individual tests for specific aspects of the async query API. | ||
*/ | ||
public class EsqlListQuerriesActionIT extends AbstractPausableIntegTestCase { | ||
public static final String QUERY = "from test | stats sum(pause_me)"; | ||
|
||
@Override | ||
protected boolean addMockHttpTransport() { | ||
return false; | ||
} | ||
|
||
public void testNoRunningQueries() throws Exception { | ||
var request = new Request("GET", "/_query/queries"); | ||
var response = getRestClient().performRequest(request); | ||
assertThat(entityToMap(response.getEntity()), is(Map.of("queries", Map.of()))); | ||
} | ||
|
||
public void testRunningQueries() throws Exception { | ||
String id = null; | ||
try (var initialResponse = sendAsyncQuery()) { | ||
id = initialResponse.asyncExecutionId().get(); | ||
// FIXME(gal, NOCOMMIT) more copy paste | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leftover? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but honest it's not worth the refactor in this case. |
||
var getResultsRequest = new GetAsyncResultRequest(id); | ||
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1)); | ||
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close(); | ||
var request = new Request("GET", "/_query/queries"); | ||
var response = getRestClient().performRequest(request); | ||
@SuppressWarnings("unchecked") | ||
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(entityToMap(response.getEntity()).values()); | ||
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet())); | ||
MapMatcher basicMatcher = MapMatcher.matchesMap() | ||
.entry("node", is(taskId.getNodeId())) | ||
.entry("id", IntegerMatcher.matches(taskId.getId())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a bit puzzled why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you pass in the object it'll do an equality matcher. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @idegtiarenko No, the map entry is |
||
.entry("query", is(QUERY)) | ||
.entry("start_time_millis", IntegerMatcher.isIntOrLong()) | ||
.entry("running_time_nanos", IntegerMatcher.isIntOrLong()); | ||
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher); | ||
|
||
request = new Request("GET", "/_query/queries/" + taskId); | ||
response = getRestClient().performRequest(request); | ||
MapMatcher.assertMap( | ||
entityToMap(response.getEntity()), | ||
basicMatcher.entry("coordinating_node", isA(String.class)) | ||
.entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class)))) | ||
); | ||
} finally { | ||
if (id != null) { | ||
// Finish the query. | ||
scriptPermits.release(numberOfDocs()); | ||
var getResultsRequest = new GetAsyncResultRequest(id); | ||
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(60)); | ||
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close(); | ||
} | ||
scriptPermits.drainPermits(); | ||
} | ||
} | ||
|
||
// FIXME(gal, NOCOMMIT) copy paste | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More leftover? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but honest it's not worth the refactor in this case. |
||
private EsqlQueryResponse sendAsyncQuery() { | ||
scriptPermits.drainPermits(); | ||
scriptPermits.release(between(1, 5)); | ||
return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS); | ||
} | ||
|
||
// FIXME(gal, NOCOMMIT) copy pasted from another place | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leftover? I guess we can move this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, that's what I did :) |
||
@SuppressWarnings("unchecked") | ||
private static <T> Map<String, T> entityToMap(HttpEntity entity) throws IOException { | ||
try (InputStream content = entity.getContent()) { | ||
XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue()); | ||
assertEquals(XContentType.JSON, xContentType); | ||
var map = XContentHelper.convertToMap(xContentType.xContent(), content, false); | ||
return (Map<String, T>) map; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.elasticsearch.action.ActionType; | ||
import org.elasticsearch.xpack.esql.plugin.EsqlGetQueryResponse; | ||
|
||
public class EsqlGetQueryAction extends ActionType<EsqlGetQueryResponse> { | ||
public static final EsqlGetQueryAction INSTANCE = new EsqlGetQueryAction(); | ||
public static final String NAME = "cluster:monitor/xpack/esql/get_queries"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a follow-up, please reach out to the data store / security team to double check what type of security is needed (similar to cat/tasks API) both for listing and cancelling queries. |
||
|
||
private EsqlGetQueryAction() { | ||
super(NAME); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.elasticsearch.action.ActionRequest; | ||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.tasks.TaskId; | ||
|
||
import java.io.IOException; | ||
|
||
public class EsqlGetQueryRequest extends ActionRequest { | ||
private final TaskId id; | ||
|
||
public EsqlGetQueryRequest(TaskId id) { | ||
this.id = id; | ||
} | ||
|
||
public TaskId id() { | ||
return id; | ||
} | ||
|
||
public EsqlGetQueryRequest(StreamInput streamInput) throws IOException { | ||
super(streamInput); | ||
id = TaskId.readFromStream(streamInput); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeWriteable(id); | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
return null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
IntOrLongMatcher
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.