Skip to content

Support maxConnections override in AbstractBlobContainerRetriesTestCase tests #126435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ protected BlobContainer createBlobContainer(
final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable Integer maxConnections,
final @Nullable ByteSizeValue bufferSize,
final @Nullable Integer maxBulkDeletes,
final @Nullable BlobPath blobContainerPath
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testShouldRetryOnUnresolvableHost() {

private void executeListBlobsAndAssertRetries() {
final int maxRetries = randomIntBetween(3, 5);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null);
expectThrows(StorageException.class, () -> blobContainer.listBlobs(randomPurpose()));
assertEquals(maxRetries + 1, requestCounters.get("/storage/v1/b/bucket/o").get());
}
Expand All @@ -237,7 +238,7 @@ public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final AtomicInteger countDown = new AtomicInteger(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null);

// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
final byte[] bytes = randomBytes(1 << 22);
Expand Down Expand Up @@ -266,7 +267,7 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
Expand Down Expand Up @@ -308,7 +309,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null, null);

// HTTP server does not send a response
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
Expand Down Expand Up @@ -360,7 +361,7 @@ public void testWriteLargeBlob() throws IOException {
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());

final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null, null);

httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
Expand Down Expand Up @@ -507,7 +508,7 @@ public String next() {
return Integer.toString(totalDeletesSent++);
}
};
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null, null);
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;

Expand Down Expand Up @@ -543,7 +544,7 @@ public void testCompareAndExchangeWhenThrottled() throws IOException {
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));

final int maxRetries = randomIntBetween(1, 3);
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null, null, null);
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
final String key = randomIdentifier();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_CONNECTIONS_SETTING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -163,6 +164,7 @@ protected BlobContainer createBlobContainer(
final @Nullable Integer maxRetries,
final @Nullable TimeValue readTimeout,
final @Nullable Boolean disableChunkedEncoding,
final @Nullable Integer maxConnections,
final @Nullable ByteSizeValue bufferSize,
final @Nullable Integer maxBulkDeletes,
final @Nullable BlobPath blobContainerPath
Expand All @@ -183,6 +185,9 @@ protected BlobContainer createBlobContainer(
if (disableChunkedEncoding != null) {
clientSettings.put(DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace(clientName).getKey(), disableChunkedEncoding);
}
if (maxConnections != null) {
clientSettings.put(MAX_CONNECTIONS_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxConnections);
}

final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(
Expand Down Expand Up @@ -265,7 +270,7 @@ public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null, null);

final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
Expand Down Expand Up @@ -313,7 +318,7 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null, null);

// HTTP server does not send a response
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
Expand Down Expand Up @@ -347,7 +352,7 @@ public void testWriteLargeBlob() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, null, bufferSize, null, null);

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand Down Expand Up @@ -443,7 +448,7 @@ public void testWriteLargeBlobStreaming() throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, null, bufferSize, null, null);

final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
Expand Down Expand Up @@ -552,7 +557,15 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception {
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
final BlobContainer blobContainer = createBlobContainer(
maxRetries,
null,
true,
null,
ByteSizeValue.ofBytes(bufferSizeBytes),
null,
null
);
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);

final byte[] bytes = randomBlobContent();
Expand Down Expand Up @@ -625,7 +638,15 @@ public void testReadDoesNotRetryForRepositoryAnalysis() {
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
final BlobContainer blobContainer = createBlobContainer(
maxRetries,
null,
true,
null,
ByteSizeValue.ofBytes(bufferSizeBytes),
null,
null
);

final byte[] bytes = randomBlobContent();

Expand Down Expand Up @@ -663,7 +684,15 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException {
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
final BlobContainer blobContainer = createBlobContainer(
maxRetries,
null,
true,
null,
ByteSizeValue.ofBytes(bufferSizeBytes),
null,
null
);
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);

final byte[] bytes = randomBlobContent(512);
Expand Down Expand Up @@ -756,7 +785,7 @@ public void handle(HttpExchange exchange) throws IOException {

public void testDoesNotRetryOnNotFound() {
final int maxRetries = between(3, 5);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null, null);

final AtomicInteger numberOfReads = new AtomicInteger(0);
@SuppressForbidden(reason = "use a http server")
Expand Down Expand Up @@ -788,7 +817,7 @@ public void handle(HttpExchange exchange) throws IOException {

public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null);

int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
Expand All @@ -808,7 +837,7 @@ public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {

public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null);

int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
Expand Down Expand Up @@ -845,7 +874,7 @@ public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {

public void testNonSnapshotDeletesAreNotRetried() {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null);

int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
Expand Down Expand Up @@ -874,7 +903,7 @@ public void testNonSnapshotDeletesAreNotRetried() {

public void testNonThrottlingErrorsAreNotRetried() {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null);

int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
Expand Down Expand Up @@ -953,7 +982,7 @@ private Set<OperationPurpose> operationPurposesThatRetryOnDelete() {

public void testGetRegisterRetries() {
final var maxRetries = between(0, 3);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null);

interface FailingHandlerFactory {
void addHandler(String blobName, Integer... responseCodes);
Expand Down Expand Up @@ -1023,7 +1052,7 @@ interface FailingHandlerFactory {
public void testSuppressedDeletionErrorsAreCapped() {
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
int maxBulkDeleteSize = randomIntBetween(1, 10);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, maxBulkDeleteSize, null);
httpServer.createContext("/", exchange -> {
if (isMultiDeleteRequest(exchange)) {
exchange.sendResponseHeaders(
Expand Down Expand Up @@ -1055,7 +1084,7 @@ public void testSuppressedDeletionErrorsAreCapped() {
public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() {
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
int maxBulkDeleteSize = randomIntBetween(10, 30);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null);
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, maxBulkDeleteSize, null);

final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>");
httpServer.createContext("/", exchange -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected BlobContainer createBlobContainer(
Integer maxRetries,
TimeValue readTimeout,
Boolean disableChunkedEncoding,
Integer maxConnections,
ByteSizeValue bufferSize,
Integer maxBulkDeletes,
BlobPath blobContainerPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected abstract BlobContainer createBlobContainer(
@Nullable Integer maxRetries,
@Nullable TimeValue readTimeout,
@Nullable Boolean disableChunkedEncoding,
@Nullable Integer maxConnections,
@Nullable ByteSizeValue bufferSize,
@Nullable Integer maxBulkDeletes,
@Nullable BlobPath blobContainerPath
Expand All @@ -94,7 +95,7 @@ protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
}

public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null, null);
final long position = randomLongBetween(0, MAX_RANGE_VAL);
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
Expand All @@ -121,7 +122,7 @@ public void testReadBlobWithRetries() throws Exception {

final byte[] bytes = randomBlobContent();
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null);
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
Expand Down Expand Up @@ -178,7 +179,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
final CountDown countDown = new CountDown(maxRetries + 1);

final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null);
final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
Expand Down Expand Up @@ -250,7 +251,7 @@ public void testReadRangeBlobWithRetries() throws Exception {
public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null);

// HTTP server does not send a response
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
Expand Down Expand Up @@ -307,7 +308,7 @@ protected OperationPurpose randomFiniteRetryingPurpose() {

public void testReadBlobWithNoHttpResponse() {
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null, null);

// HTTP server closes connection immediately
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
Expand All @@ -327,7 +328,7 @@ public void testReadBlobWithNoHttpResponse() {

public void testReadBlobWithPrematureConnectionClose() {
final int maxRetries = randomInt(20);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null);

final boolean alwaysFlushBody = randomBoolean();

Expand Down