Skip to content

Support partial results in CCS in ES|QL #122708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 20, 2025
Merged
5 changes: 5 additions & 0 deletions docs/changelog/122708.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122708
summary: Support partial results in CCS in ES|QL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange";
public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange";

private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
public static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -48,6 +50,7 @@ public abstract class AbstractCrossClusterTestCase extends AbstractMultiClusters
protected static final String REMOTE_INDEX = "logs-2";
protected static final String INDEX_WITH_BLOCKING_MAPPING = "blocking";
protected static final String INDEX_WITH_FAIL_MAPPING = "failing";
protected static final AtomicLong NEXT_DOC_ID = new AtomicLong(0);

@Override
protected List<String> remoteClusterAlias() {
Expand Down Expand Up @@ -150,7 +153,7 @@ protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bo
protected Map<String, Object> setupClusters(int numClusters) throws IOException {
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
int numShardsLocal = randomIntBetween(1, 5);
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10);

int numShardsRemote = randomIntBetween(1, 5);
populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
Expand Down Expand Up @@ -180,19 +183,24 @@ protected Map<String, Object> setupClusters(int numClusters) throws IOException
return clusterInfo;
}

protected void populateLocalIndices(String indexName, int numShards) {
Client localClient = client(LOCAL_CLUSTER);
protected Set<String> populateIndex(String clusterAlias, String indexName, int numShards, int numDocs) {
Client client = client(clusterAlias);
assertAcked(
localClient.admin()
client.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long")
);
for (int i = 0; i < 10; i++) {
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
Set<String> ids = new HashSet<>();
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
for (int i = 0; i < numDocs; i++) {
String id = Long.toString(NEXT_DOC_ID.incrementAndGet());
client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get();
ids.add(id);
}
localClient.admin().indices().prepareRefresh(indexName).get();
client.admin().indices().prepareRefresh(indexName).get();
return ids;
}

protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ Map<String, String> createEmptyIndicesWithNoMappings(int numClusters) {

Map<String, Object> setupFailClusters() throws IOException {
int numShardsLocal = randomIntBetween(1, 3);
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10);

int numShardsRemote = randomIntBetween(1, 3);
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand All @@ -34,9 +35,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -74,69 +75,97 @@ void startComputeOnRemoteCluster(
RemoteCluster cluster,
Runnable cancelQueryOnFailure,
EsqlExecutionInfo executionInfo,
ActionListener<ComputeResponse> listener
ActionListener<List<DriverProfile>> listener
) {
var queryPragmas = configuration.pragmas();
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
final var childSessionId = computeService.newChildSession(sessionId);
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
final String clusterAlias = cluster.clusterAlias();
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
var resp = finalResponse.get();
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
}))) {
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
EsqlCCSUtils.skipUnavailableListener(
computeListener.acquireAvoid(),
executionInfo,
clusterAlias,
EsqlExecutionInfo.Cluster.Status.SKIPPED
).delegateFailureAndWrap((l, unused) -> {
var listenerGroup = new RemoteListenerGroup(
transportService,
rootTask,
computeListener,
clusterAlias,
executionInfo,
l
);

var remoteSink = exchangeService.newRemoteSink(
listenerGroup.getGroupTask(),
childSessionId,
transportService,
cluster.connection
);
final AtomicBoolean pagesFetched = new AtomicBoolean();
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
listener = listener.delegateResponse((l, e) -> {
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
l.onResponse(List.of());
} else if (configuration.allowPartialResults()) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
l.onResponse(List.of());
} else {
l.onFailure(e);
}
});
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
listener.delegateFailure((l, unused) -> {
final CancellableTask groupTask;
final Runnable onGroupFailure;
boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
if (failFast) {
groupTask = rootTask;
onGroupFailure = cancelQueryOnFailure;
} else {
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
}
try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> {
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
return profiles;
}))) {
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(
remoteSink,
executionInfo.isSkipUnavailable(clusterAlias) == false,
() -> {},
failFast,
() -> pagesFetched.set(true),
queryPragmas.concurrentExchangeClients(),
listenerGroup.getExchangeRequestListener()
computeListener.acquireAvoid()
);
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
finalResponse.set(r);
return r.getProfiles();
});
transportService.sendChildRequest(
cluster.connection,
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
listenerGroup.getGroupTask(),
groupTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
);
})
);
}
}
})
);
}

private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(resp.getTotalShards())
.setSuccessfulShards(resp.getSuccessfulShards())
.setSkippedShards(resp.getSkippedShards())
.setFailedShards(resp.getFailedShards());
if (resp.getTook() != null) {
builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()));
} else {
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
builder.setTook(executionInfo.tookSoFar());
}
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
if (executionInfo.isStopped() || resp.failedShards > 0) {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
} else {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
}
}
return builder.build();
});
}

List<RemoteCluster> getRemoteClusters(
Expand Down
Loading