diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index f42dde4dcb091..fee728933c343 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -229,7 +229,7 @@ public void testShouldRetryOnUnresolvableHost() { private void executeListBlobsAndAssertRetries() { final int maxRetries = randomIntBetween(3, 5); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); expectThrows(StorageException.class, () -> blobContainer.listBlobs(randomPurpose())); assertEquals(maxRetries + 1, requestCounters.get("/storage/v1/b/bucket/o").get()); } @@ -238,7 +238,7 @@ public void testReadLargeBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(2, 10); final AtomicInteger countDown = new AtomicInteger(maxRetries); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); // SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks final byte[] bytes = randomBytes(1 << 22); @@ -267,7 +267,7 @@ public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(2, 10); final CountDown countDown = new CountDown(maxRetries); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); final byte[] bytes = randomBlobContent(); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); @@ -309,7 +309,7 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1).readTimeout(readTimeout).build(); // HTTP server does not send a response httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> { @@ -361,7 +361,7 @@ public void testWriteLargeBlob() throws IOException { logger.debug("starting with resumable upload id [{}]", sessionUploadId.get()); final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null; - final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(nbErrors + 1).readTimeout(readTimeout).build(); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { final BytesReference requestBody = Streams.readFully(exchange.getRequestBody()); @@ -508,7 +508,7 @@ public String next() { return Integer.toString(totalDeletesSent++); } }; - final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1).build(); httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> { assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH; @@ -544,7 +544,7 @@ public void testCompareAndExchangeWhenThrottled() throws IOException { httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket"))); final int maxRetries = randomIntBetween(1, 3); - final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null, null, null); + final BlobContainer container = blobContainerBuilder().maxRetries(maxRetries).build(); final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); final String key = randomIdentifier(); diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index cce6e14c9dbb4..629015b40b5ff 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -270,7 +270,7 @@ public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).disableChunkedEncoding(true).build(); final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> { @@ -318,7 +318,10 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteBlobWithReadTimeouts() { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1) + .readTimeout(readTimeout) + .disableChunkedEncoding(true) + .build(); // HTTP server does not send a response httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> { @@ -352,7 +355,10 @@ public void testWriteLargeBlob() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB); - final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, null, bufferSize, null, null); + final BlobContainer blobContainer = blobContainerBuilder().readTimeout(readTimeout) + .disableChunkedEncoding(true) + .bufferSize(bufferSize) + .build(); final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); @@ -448,7 +454,10 @@ public void testWriteLargeBlobStreaming() throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB); - final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, null, bufferSize, null, null); + final BlobContainer blobContainer = blobContainerBuilder().readTimeout(readTimeout) + .disableChunkedEncoding(true) + .bufferSize(bufferSize) + .build(); final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); @@ -557,15 +566,10 @@ public void testReadRetriesAfterMeaningfulProgress() throws Exception { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer( - maxRetries, - null, - true, - null, - ByteSizeValue.ofBytes(bufferSizeBytes), - null, - null - ); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries) + .disableChunkedEncoding(true) + .bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes)) + .build(); final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100); final byte[] bytes = randomBlobContent(); @@ -638,15 +642,10 @@ public void testReadDoesNotRetryForRepositoryAnalysis() { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer( - maxRetries, - null, - true, - null, - ByteSizeValue.ofBytes(bufferSizeBytes), - null, - null - ); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries) + .disableChunkedEncoding(true) + .bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes)) + .build(); final byte[] bytes = randomBlobContent(); @@ -684,15 +683,10 @@ public void testReadWithIndicesPurposeRetriesForever() throws IOException { 0, randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes())) ); - final BlobContainer blobContainer = createBlobContainer( - maxRetries, - null, - true, - null, - ByteSizeValue.ofBytes(bufferSizeBytes), - null, - null - ); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries) + .disableChunkedEncoding(true) + .bufferSize(ByteSizeValue.ofBytes(bufferSizeBytes)) + .build(); final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100); final byte[] bytes = randomBlobContent(512); @@ -785,7 +779,7 @@ public void handle(HttpExchange exchange) throws IOException { public void testDoesNotRetryOnNotFound() { final int maxRetries = between(3, 5); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).disableChunkedEncoding(true).build(); final AtomicInteger numberOfReads = new AtomicInteger(0); @SuppressForbidden(reason = "use a http server") @@ -816,8 +810,11 @@ public void handle(HttpExchange exchange) throws IOException { } public void testSnapshotDeletesRetryOnThrottlingError() throws IOException { - // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder() + // disable AWS-client retries + .maxRetries(0) + .disableChunkedEncoding(true) + .build(); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -836,8 +833,11 @@ public void testSnapshotDeletesRetryOnThrottlingError() throws IOException { } public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() { - // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder() + // disable AWS-client retries + .maxRetries(0) + .disableChunkedEncoding(true) + .build(); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -873,8 +873,11 @@ public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() { } public void testNonSnapshotDeletesAreNotRetried() { - // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder() + // disable AWS-client retries + .maxRetries(0) + .disableChunkedEncoding(true) + .build(); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -902,8 +905,11 @@ public void testNonSnapshotDeletesAreNotRetried() { } public void testNonThrottlingErrorsAreNotRetried() { - // disable AWS-client retries - final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder() + // disable AWS-client retries + .maxRetries(0) + .disableChunkedEncoding(true) + .build(); int numBlobsToDelete = randomIntBetween(500, 3000); List blobsToDelete = new ArrayList<>(); @@ -982,7 +988,7 @@ private Set operationPurposesThatRetryOnDelete() { public void testGetRegisterRetries() { final var maxRetries = between(0, 3); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); interface FailingHandlerFactory { void addHandler(String blobName, Integer... responseCodes); @@ -1052,7 +1058,11 @@ interface FailingHandlerFactory { public void testSuppressedDeletionErrorsAreCapped() { final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); int maxBulkDeleteSize = randomIntBetween(1, 10); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, maxBulkDeleteSize, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1) + .readTimeout(readTimeout) + .disableChunkedEncoding(true) + .maxBulkDeletes(maxBulkDeleteSize) + .build(); httpServer.createContext("/", exchange -> { if (isMultiDeleteRequest(exchange)) { exchange.sendResponseHeaders( @@ -1084,7 +1094,11 @@ public void testSuppressedDeletionErrorsAreCapped() { public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() { final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); int maxBulkDeleteSize = randomIntBetween(10, 30); - final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, maxBulkDeleteSize, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(1) + .readTimeout(readTimeout) + .disableChunkedEncoding(true) + .maxBulkDeletes(maxBulkDeleteSize) + .build(); final Pattern pattern = Pattern.compile("(.+?)"); httpServer.createContext("/", exchange -> { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java index 8fcee49478305..398a7b3db3eef 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java @@ -79,23 +79,13 @@ public void tearDown() throws Exception { protected abstract Class unresponsiveExceptionType(); - protected abstract BlobContainer createBlobContainer( - @Nullable Integer maxRetries, - @Nullable TimeValue readTimeout, - @Nullable Boolean disableChunkedEncoding, - @Nullable Integer maxConnections, - @Nullable ByteSizeValue bufferSize, - @Nullable Integer maxBulkDeletes, - @Nullable BlobPath blobContainerPath - ); - protected org.hamcrest.Matcher readTimeoutExceptionMatcher() { return either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class)) .or(instanceOf(RuntimeException.class)); } public void testReadNonexistentBlobThrowsNoSuchFileException() { - final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(between(1, 5)).build(); final long position = randomLongBetween(0, MAX_RANGE_VAL); final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position))); final Exception exception = expectThrows(NoSuchFileException.class, () -> { @@ -122,7 +112,7 @@ public void testReadBlobWithRetries() throws Exception { final byte[] bytes = randomBlobContent(); final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); if (countDown.countDown()) { @@ -179,7 +169,7 @@ public void testReadRangeBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries + 1); final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build(); final byte[] bytes = randomBlobContent(); httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); @@ -251,7 +241,7 @@ public void testReadRangeBlobWithRetries() throws Exception { public void testReadBlobWithReadTimeouts() { final int maxRetries = randomInt(5); final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).readTimeout(readTimeout).build(); // HTTP server does not send a response httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {}); @@ -308,7 +298,7 @@ protected OperationPurpose randomFiniteRetryingPurpose() { public void testReadBlobWithNoHttpResponse() { final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200)); - final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(randomInt(5)).readTimeout(readTimeout).build(); // HTTP server closes connection immediately httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close); @@ -328,7 +318,7 @@ public void testReadBlobWithNoHttpResponse() { public void testReadBlobWithPrematureConnectionClose() { final int maxRetries = randomInt(20); - final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null, null); + final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); final boolean alwaysFlushBody = randomBoolean(); @@ -505,4 +495,89 @@ private void ensureOpen() throws IOException { } } } + + /** + * Method for subclasses to define how to create a {@link BlobContainer} with the given (optional) parameters. Callers should use + * {@link #blobContainerBuilder} to obtain a builder to construct the arguments to this method rather than calling it directly. + */ + protected abstract BlobContainer createBlobContainer( + @Nullable Integer maxRetries, + @Nullable TimeValue readTimeout, + @Nullable Boolean disableChunkedEncoding, + @Nullable Integer maxConnections, + @Nullable ByteSizeValue bufferSize, + @Nullable Integer maxBulkDeletes, + @Nullable BlobPath blobContainerPath + ); + + protected final class TestBlobContainerBuilder { + @Nullable + private Integer maxRetries; + @Nullable + private TimeValue readTimeout; + @Nullable + private Boolean disableChunkedEncoding; + @Nullable + private Integer maxConnections; + @Nullable + private ByteSizeValue bufferSize; + @Nullable + private Integer maxBulkDeletes; + @Nullable + private BlobPath blobContainerPath; + + public TestBlobContainerBuilder maxRetries(@Nullable Integer maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public TestBlobContainerBuilder readTimeout(@Nullable TimeValue readTimeout) { + this.readTimeout = readTimeout; + return this; + } + + public TestBlobContainerBuilder disableChunkedEncoding(@Nullable Boolean disableChunkedEncoding) { + this.disableChunkedEncoding = disableChunkedEncoding; + return this; + } + + public TestBlobContainerBuilder maxConnections(@Nullable Integer maxConnections) { + this.maxConnections = maxConnections; + return this; + } + + public TestBlobContainerBuilder bufferSize(@Nullable ByteSizeValue bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public TestBlobContainerBuilder maxBulkDeletes(@Nullable Integer maxBulkDeletes) { + this.maxBulkDeletes = maxBulkDeletes; + return this; + } + + public TestBlobContainerBuilder blobContainerPath(@Nullable BlobPath blobContainerPath) { + this.blobContainerPath = blobContainerPath; + return this; + } + + public BlobContainer build() { + return createBlobContainer( + maxRetries, + readTimeout, + disableChunkedEncoding, + maxConnections, + bufferSize, + maxBulkDeletes, + blobContainerPath + ); + } + } + + /** + * @return a {@link TestBlobContainerBuilder} to construct the arguments with which to call {@link #createBlobContainer}. + */ + protected final TestBlobContainerBuilder blobContainerBuilder() { + return new TestBlobContainerBuilder(); + } }