Skip to content

[9.0] Threadpool merge executor does not block aborted merges (#129613) #129727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.BeforeClass;

import java.util.Locale;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;

@BeforeClass
public static void setAvailableDiskSpaceBufferLimit() {
// this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
// because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
// operations at this high abstraction level (merging is triggered more or less automatically in the background)
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
// the very short disk space polling interval ensures timely blocking of merges
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms")
// merges pile up more easily when there's only a few threads executing them
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2))
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b")
// let's not worry about allocation watermarks (e.g. read-only shards) in this test suite
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
.build();
}

public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
String node = internalCluster().startNode();
setTotalSpace(node, Long.MAX_VALUE);
var indicesService = internalCluster().getInstance(IndicesService.class, node);
ensureStableCluster(1);
// create index
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
);
// do some indexing
indexRandom(
false,
false,
false,
false,
IntStream.range(1, randomIntBetween(2, 10))
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
.toList()
);
// get current disk space usage
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
// restrict the total disk space such that the next merge does not have sufficient disk space
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
setTotalSpace(node, insufficientTotalDiskSpace);
// node stats' FS stats should report that there is insufficient disk space available
assertBusy(() -> {
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
});
while (true) {
// maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed)
assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get());
// keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges,
// and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes()
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
if (nodesStatsResponse.getNodes()
.getFirst()
.getThreadPool()
.stats()
.stream()
.filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
.findAny()
.get()
.queue() > 0
&& indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) {
break;
}
// more indexing
indexRandom(
false,
false,
false,
false,
IntStream.range(1, randomIntBetween(2, 10))
.mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50)))
.toList()
);
}
// now delete the index in this state, i.e. with merges enqueued and blocked
assertAcked(indicesAdmin().prepareDelete(indexName).get());
// index should now be gone
assertBusy(() -> {
expectThrows(
IndexNotFoundException.class,
() -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get()
);
});
assertBusy(() -> {
// merge thread pool should be done with the enqueue merge tasks
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
assertThat(
nodesStatsResponse.getNodes()
.getFirst()
.getThreadPool()
.stats()
.stream()
.filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
.findAny()
.get()
.queue(),
equalTo(0)
);
// and the merge executor should also report that merging is done now
assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace());
assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone());
});
}

public void setTotalSpace(String dataNodeName, long totalSpace) {
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
refreshClusterInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.monitor.fs.FsInfo;
Expand All @@ -28,6 +29,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.IdentityHashMap;
Expand Down Expand Up @@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
/** How frequently we check disk usage (default: 5 seconds). */
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting(
"indices.merge.disk.check_interval",
// disabled by default
// there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient)
// see: https://github.com/elastic/elasticsearch/issues/129335
TimeValue.timeValueSeconds(0),
TimeValue.timeValueSeconds(5),
Property.Dynamic,
Property.NodeScope
);
Expand Down Expand Up @@ -294,6 +293,10 @@ public boolean allDone() {
return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L;
}

public boolean isMergingBlockedDueToInsufficientDiskSpace() {
return availableDiskSpacePeriodicMonitor.isScheduled() && queuedMergeTasks.queueHeadIsOverTheAvailableBudget();
}

/**
* Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time.
* A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level.
Expand Down Expand Up @@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(

static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
MergeTaskPriorityBlockingQueue() {
// start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
// use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
// updated according to the remaining disk space requirements of the currently running merge tasks
// by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
// use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
super(MergeTask::estimatedRemainingMergeSize, 0L);
}

Expand All @@ -563,7 +565,7 @@ long getAvailableBudget() {

// exposed for tests
MergeTask peekQueue() {
return enqueuedByBudget.peek();
return enqueuedByBudget.peek().v1();
}
}

Expand All @@ -573,15 +575,15 @@ MergeTask peekQueue() {
*/
static class PriorityBlockingQueueWithBudget<E> {
private final ToLongFunction<? super E> budgetFunction;
protected final PriorityQueue<E> enqueuedByBudget;
protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
private final ReentrantLock lock;
private final Condition elementAvailable;
protected long availableBudget;

PriorityBlockingQueueWithBudget(ToLongFunction<? super E> budgetFunction, long initialAvailableBudget) {
this.budgetFunction = budgetFunction;
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction));
this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2));
this.unreleasedBudgetPerElement = new IdentityHashMap<>();
this.lock = new ReentrantLock();
this.elementAvailable = lock.newCondition();
Expand All @@ -592,7 +594,7 @@ boolean enqueue(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
enqueuedByBudget.offer(e);
enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e)));
elementAvailable.signal();
} finally {
lock.unlock();
Expand All @@ -608,22 +610,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E peek;
long peekBudget;
Tuple<E, Long> head;
// blocks until the smallest budget element fits the currently available budget
while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) {
while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) {
elementAvailable.await();
}
head = enqueuedByBudget.poll();
// deducts and holds up that element's budget from the available budget
return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
return newElementWithReleasableBudget(head.v1(), head.v2());
} finally {
lock.unlock();
}
}

/**
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
* that are still in use. The budget of in-use elements is also updated (by re-applying the budget function).
* that are still in use. The elements budget is also updated by re-applying the budget function.
* The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
* is over this newly computed available budget.
*/
Expand All @@ -632,20 +634,50 @@ void updateBudget(long availableBudget) {
lock.lock();
try {
this.availableBudget = availableBudget;
// update the per-element budget (these are all the elements that are using any budget)
// updates the budget of enqueued elements (and possibly reorders the priority queue)
updateBudgetOfEnqueuedElementsAndReorderQueue();
// update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
// available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
// the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
elementAvailable.signalAll();
} finally {
lock.unlock();
}
}

private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
assert this.lock.isHeldByCurrentThread();
int queueSizeBefore = enqueuedByBudget.size();
var it = enqueuedByBudget.iterator();
List<Tuple<E, Long>> elementsToReorder = new ArrayList<>();
while (it.hasNext()) {
var elementWithBudget = it.next();
Long previousBudget = elementWithBudget.v2();
long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1());
if (previousBudget.equals(latestBudget) == false) {
// the budget (estimation) of an enqueued element has changed
// this element will be reordered by removing and reinserting using the latest budget (estimation)
it.remove();
elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget));
}
}
// reinsert elements based on the latest budget (estimation)
for (var reorderedElement : elementsToReorder) {
enqueuedByBudget.offer(reorderedElement);
}
assert queueSizeBefore == enqueuedByBudget.size();
}

boolean isQueueEmpty() {
return enqueuedByBudget.isEmpty();
}

boolean queueHeadIsOverTheAvailableBudget() {
var head = enqueuedByBudget.peek();
return head != null && head.v2() > availableBudget;
}

int queueSize() {
return enqueuedByBudget.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,13 @@ void abort() {
long estimatedRemainingMergeSize() {
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
if (onGoingMerge.getMerge().isAborted()) {
// if the merge is aborted the assumption is that merging will soon stop with negligible further writing
return 0L;
} else {
long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
}
}

public long getMergeMemoryEstimateBytes() {
Expand Down
Loading