Skip to content

[8.19] Fix ThreadPoolMergeExecutorServiceTests testIORateIsAdjustedForRunningMergeTasks #129516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -299,9 +300,9 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
}
}

public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
int mergeExecutorThreadCount = randomIntBetween(1, 3);
int mergesStillToSubmit = randomIntBetween(1, 10);
public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
int mergeExecutorThreadCount = randomIntBetween(1, 5);
int mergesStillToSubmit = randomIntBetween(1, 20);
int mergesStillToComplete = mergesStillToSubmit;
Settings settings = Settings.builder()
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
Expand All @@ -320,6 +321,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
Semaphore runMergeSemaphore = new Semaphore(0);
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
Set<MergeTask> currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet();
while (mergesStillToComplete > 0) {
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -337,39 +339,48 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
}).when(mergeTask).schedule();
doAnswer(mock -> {
currentlyRunningMergeTasksSet.add(mergeTask);
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
// wait to be signalled before completing
runMergeSemaphore.acquire();
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
currentlyRunningMergeTasksSet.remove(mergeTask);
return null;
}).when(mergeTask).run();
doAnswer(mock -> {
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
// wait to be signalled before completing
runMergeSemaphore.acquire();
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
return null;
}).when(mergeTask).abort();
int activeMergeTasksCount = threadPoolExecutor.getActiveCount();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
// all currently running merge tasks must be IO throttled
assertThat(runMergeSemaphore.availablePermits(), is(0));
boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount;
boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(mergeTaskSubmitted);
if (isAnyExecutorAvailable) {
assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)));
}
long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
// all currently running merge tasks must be IO throttled to the latest IO Rate
assertBusy(() -> {
// await new merge to start executing
if (activeMergeTasksCount < mergeExecutorThreadCount) {
assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeTasksCount + 1));
}
// assert IO throttle is set on the running merge tasks
// assert IO throttle is set on ALL the running merge tasks
for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet) {
var ioRateCaptor = ArgumentCaptor.forClass(Long.class);
verify(currentlyRunningMergeTask).run();
// only interested in the last invocation
var ioRateCaptor = ArgumentCaptor.forClass(Long.class);
verify(currentlyRunningMergeTask, atLeastOnce()).setIORateLimit(ioRateCaptor.capture());
assertThat(ioRateCaptor.getValue(), is(newIORate));
assertThat(ioRateCaptor.getValue(), is(latestIORate));
}
});
mergesStillToSubmit--;
} else {
long completedMerges = threadPoolExecutor.getCompletedTaskCount();
runMergeSemaphore.release();
// await merge to finish
assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1)));
assertBusy(() -> {
assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1));
assertThat(runMergeSemaphore.availablePermits(), is(0));
});
mergesStillToComplete--;
}
}
Expand Down