-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Retry shard movements during ESQL query #126653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
63b19d6
90f33b8
b1a04ff
545c50d
8b5f590
406df8d
1e3e92d
f7916f7
068b6be
a031270
1453368
4220622
2ea3292
782c595
8d2293e
54c0656
811d8fc
c20d53c
56128ee
9e2bb85
333ed60
b8b4a95
503f13e
e660001
0908649
b2819b6
e899a10
e298aaf
bb67124
ef0ffef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 126653 | ||
summary: Retry shard movements during ESQL query | ||
area: ES|QL | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.plugin; | ||
|
||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.support.WriteRequest; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.CollectionUtils; | ||
import org.elasticsearch.compute.operator.exchange.ExchangeService; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.test.transport.MockTransportService; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase; | ||
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; | ||
|
||
import java.util.Collection; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.LongAdder; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; | ||
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.hasSize; | ||
|
||
public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make this test similar to SearchWhileRelocatingIT? We continue running ES|QL on one thread while moving shards back and forth between two sets of nodes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); | ||
} | ||
|
||
public void testSearchWhileRelocating() throws InterruptedException { | ||
internalCluster().ensureAtLeastNumDataNodes(3); | ||
var primaries = randomIntBetween(1, 10); | ||
var replicas = randomIntBetween(0, 1); | ||
|
||
indicesAdmin().prepareCreate("index-1").setSettings(indexSettings(primaries, replicas)).get(); | ||
|
||
var docs = randomIntBetween(10, 100); | ||
var bulk = client().prepareBulk("index-1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
for (int i = 0; i < docs; i++) { | ||
bulk.add(new IndexRequest().source("key", "value-1")); | ||
} | ||
bulk.get(); | ||
|
||
// start background searches | ||
var stopped = new AtomicBoolean(false); | ||
var queries = new LongAdder(); | ||
var threads = new Thread[randomIntBetween(1, 5)]; | ||
for (int i = 0; i < threads.length; i++) { | ||
threads[i] = new Thread(() -> { | ||
while (stopped.get() == false) { | ||
try (EsqlQueryResponse resp = run("FROM index-1")) { | ||
assertThat(getValuesList(resp), hasSize(docs)); | ||
} | ||
queries.increment(); | ||
} | ||
}); | ||
} | ||
for (Thread thread : threads) { | ||
thread.start(); | ||
} | ||
|
||
// start shard movements | ||
var rounds = randomIntBetween(1, 10); | ||
var names = internalCluster().getNodeNames(); | ||
for (int i = 0; i < rounds; i++) { | ||
for (String name : names) { | ||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) | ||
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name)) | ||
.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fun |
||
ensureGreen("index-1"); | ||
Thread.yield(); | ||
} | ||
} | ||
|
||
stopped.set(true); | ||
for (Thread thread : threads) { | ||
thread.join(10_000); | ||
} | ||
|
||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) | ||
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._name")) | ||
.get(); | ||
assertThat(queries.sum(), greaterThan((long) threads.length)); | ||
} | ||
|
||
public void testRetryOnShardMovement() { | ||
internalCluster().ensureAtLeastNumDataNodes(2); | ||
|
||
assertAcked( | ||
client().admin() | ||
.indices() | ||
.prepareCreate("index-1") | ||
.setSettings( | ||
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
) | ||
); | ||
assertAcked( | ||
client().admin() | ||
.indices() | ||
.prepareCreate("index-2") | ||
.setSettings( | ||
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
) | ||
); | ||
client().prepareBulk("index-1") | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.add(new IndexRequest().source("key", "value-1")) | ||
.get(); | ||
client().prepareBulk("index-2") | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.add(new IndexRequest().source("key", "value-2")) | ||
.get(); | ||
|
||
var shouldMove = new AtomicBoolean(true); | ||
|
||
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { | ||
as(transportService, MockTransportService.class).addRequestHandlingBehavior( | ||
ExchangeService.OPEN_EXCHANGE_ACTION_NAME, | ||
(handler, request, channel, task) -> { | ||
// move index shard | ||
if (shouldMove.compareAndSet(true, false)) { | ||
var currentShardNodeId = clusterService().state() | ||
.routingTable() | ||
.index("index-1") | ||
.shard(0) | ||
.primaryShard() | ||
.currentNodeId(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to check if this could be replaced by a proper API call instead |
||
assertAcked( | ||
client().admin() | ||
.indices() | ||
.prepareUpdateSettings("index-1") | ||
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId)) | ||
); | ||
ensureGreen("index-1"); | ||
} | ||
// execute data node request | ||
handler.messageReceived(request, channel, task); | ||
} | ||
); | ||
} | ||
|
||
try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) { | ||
assertThat(getValuesList(resp), hasSize(2)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we send back the node id that we ran the driver on when we There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like to avoid it. We do not really know source for each row nor where shards are currently allocated to. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, looks like a leftover after merging 2adb36e#diff-e9bf1f63e5fb1069f6fd3e4a7fb3b1fa44ff67a60c10dd9bb5f74caa40f2f3e3