Skip to content

Fix S3RepositoryAnalysisRestIT #126593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.repositories.s3;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
Expand Down Expand Up @@ -376,12 +377,11 @@ public void copyBlob(
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().copyObject(copyRequest); });
}
}
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception amazonS3Exception) {
if (amazonS3Exception.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
final var sourceKey = s3SourceBlobContainer.buildKey(sourceBlobName);
throw new NoSuchFileException("Copy source [" + sourceKey + "] not found: " + amazonS3Exception.getMessage());
}
} catch (final Exception e) {
if (e instanceof AmazonServiceException ase && ase.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
throw new NoSuchFileException(
"Copy source [" + s3SourceBlobContainer.buildKey(sourceBlobName) + "] not found: " + ase.getMessage()
);
}
throw new IOException("Unable to copy object [" + blobName + "] from [" + sourceBlobContainer + "][" + sourceBlobName + "]", e);
}
Expand Down Expand Up @@ -618,8 +618,10 @@ private void executeMultipart(
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
}
success = true;

} catch (final AmazonClientException e) {
} catch (final Exception e) {
if (e instanceof AmazonServiceException ase && ase.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
throw new NoSuchFileException(blobName, null, e.getMessage());
}
Comment on lines +622 to +624
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd hit this during my own testing and thought I'd fixed it in copyBlob (so that it would translate for single part as well). I am slapping my head noticing now that executeMultipart is wrapping AmazonClientException in IOException so this doesn't actually kick in on the multipart copy path, but I wonder if it's better to unwrap back in copyBlob, or to hoist the wrapping out of executeMultipart into writeBlob?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with that but it ends up being a bit weird catching a 404 in writeBlob and friends since it doesn't really make sense for a single upload, and only makes sense for multipart uploads if they are aborted concurrently. I'm going to leave it as it is for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'd meant not wrapping AmazonServiceException into IOException in executeMultipart, but rather in writeBlob. That way this 404 handler here could just disappear, since the copyBlob handler would then handle it. But it's a trivial difference really.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I guess you mean that multipart upload (not copy) can also return a 404 (I suppose if the upload itself is cancelled while parts are being uploaded). I don't think we were catching that before, but if that's the issue then I see what you mean (though I don't know if a writeBlob caller would make the distinction).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah maybe we can tidy this up a bit but I'm going to keep this PR focussed on just getting the SDKv2 tests passing again

throw new IOException("Unable to upload or copy object [" + blobName + "] using multipart upload", e);
} finally {
if ((success == false) && Strings.hasLength(uploadId.get())) {
Expand Down
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,6 @@ tests:
- class: org.elasticsearch.repositories.blobstore.testkit.rest.SnapshotRepoTestKitClientYamlTestSuiteIT
method: test {p0=/10_analyze/Analysis without details}
issue: https://github.com/elastic/elasticsearch/issues/126569
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.S3RepositoryAnalysisRestIT
method: testRepositoryAnalysis
issue: https://github.com/elastic/elasticsearch/issues/126576
- class: org.elasticsearch.xpack.esql.action.ForkIT
method: testWithStatsSimple
issue: https://github.com/elastic/elasticsearch/issues/126607
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
import com.sun.net.httpserver.HttpServer;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.rules.ExternalResource;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;

import static fixture.aws.AwsCredentialsUtils.ANY_REGION;
Expand All @@ -26,7 +34,10 @@

public class S3HttpFixture extends ExternalResource {

private static final Logger logger = LogManager.getLogger(S3HttpFixture.class);

private HttpServer server;
private ExecutorService executorService;

private final boolean enabled;
private final String bucket;
Expand Down Expand Up @@ -71,16 +82,30 @@ public void stop(int delay) {

protected void before() throws Throwable {
if (enabled) {
this.executorService = EsExecutors.newScaling(
"s3-http-fixture",
1,
100,
30,
TimeUnit.SECONDS,
true,
EsExecutors.daemonThreadFactory("s3-http-fixture"),
new ThreadContext(Settings.EMPTY)
);

this.server = HttpServer.create(getLocalFixtureAddress(), 0);
this.server.createContext("/", Objects.requireNonNull(createHandler()));
this.server.setExecutor(executorService);
server.start();
logger.info("running S3HttpFixture at " + getAddress());
}
}

@Override
protected void after() {
if (enabled) {
stop(0);
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.elasticsearch.test.fixture.HttpHeaderParser.parseRangeHeader;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.w3c.dom.Node.ELEMENT_NODE;

/**
Expand Down Expand Up @@ -121,9 +122,11 @@ public void handle(final HttpExchange exchange) throws IOException {
uploadsList.append("<MaxUploads>10000</MaxUploads>");
uploadsList.append("<IsTruncated>false</IsTruncated>");

for (final var multipartUpload : uploads.values()) {
if (multipartUpload.getPath().startsWith(prefix)) {
multipartUpload.appendXml(uploadsList);
synchronized (uploads) {
for (final var multipartUpload : uploads.values()) {
if (multipartUpload.getPath().startsWith(prefix)) {
multipartUpload.appendXml(uploadsList);
}
}
}

Expand All @@ -135,9 +138,7 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.getResponseBody().write(response);

} else if (request.isInitiateMultipartUploadRequest()) {
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), request.path().substring(bucket.length() + 2));
uploads.put(upload.getUploadId(), upload);

final var upload = putUpload(request.path().substring(bucket.length() + 2));
final var uploadResult = new StringBuilder();
uploadResult.append("<?xml version='1.0' encoding='UTF-8'?>");
uploadResult.append("<InitiateMultipartUploadResult>");
Expand All @@ -152,7 +153,7 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.getResponseBody().write(response);

} else if (request.isUploadPartRequest()) {
final var upload = uploads.get(request.getQueryParamOnce("uploadId"));
final var upload = getUpload(request.getQueryParamOnce("uploadId"));
if (upload == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
Expand Down Expand Up @@ -187,42 +188,45 @@ public void handle(final HttpExchange exchange) throws IOException {
}

} else if (request.isCompleteMultipartUploadRequest()) {
final var upload = uploads.remove(request.getQueryParamOnce("uploadId"));
if (upload == null) {
if (Randomness.get().nextBoolean()) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
final byte[] responseBody;
synchronized (uploads) {
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
if (upload == null) {
if (Randomness.get().nextBoolean()) {
responseBody = null;
} else {
responseBody = """
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchUpload</Code>
<Message>No such upload</Message>
<RequestId>test-request-id</RequestId>
<HostId>test-host-id</HostId>
</Error>""".getBytes(StandardCharsets.UTF_8);
}
} else {
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchUpload</Code>
<Message>No such upload</Message>
<RequestId>test-request-id</RequestId>
<HostId>test-host-id</HostId>
</Error>""").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
blobs.put(request.path(), blobContents);
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult>\n"
+ "<Bucket>"
+ bucket
+ "</Bucket>\n"
+ "<Key>"
+ request.path()
+ "</Key>\n"
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
}
}
if (responseBody == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
blobs.put(request.path(), blobContents);

byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult>\n"
+ "<Bucket>"
+ bucket
+ "</Bucket>\n"
+ "<Key>"
+ request.path()
+ "</Key>\n"
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length);
exchange.getResponseBody().write(responseBody);
}
} else if (request.isAbortMultipartUploadRequest()) {
final var upload = uploads.remove(request.getQueryParamOnce("uploadId"));
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1);

} else if (request.isPutObjectRequest()) {
Expand Down Expand Up @@ -521,8 +525,24 @@ private static HttpHeaderParser.Range parsePartRange(final HttpExchange exchange
return parseRangeHeader(sourceRangeHeaders.getFirst());
}

MultipartUpload putUpload(String path) {
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
synchronized (uploads) {
assertNull("upload " + upload.getUploadId() + " should not exist", uploads.put(upload.getUploadId(), upload));
return upload;
}
}

MultipartUpload getUpload(String uploadId) {
return uploads.get(uploadId);
synchronized (uploads) {
return uploads.get(uploadId);
}
}

MultipartUpload removeUpload(String uploadId) {
synchronized (uploads) {
return uploads.remove(uploadId);
}
}

public S3Request parseRequest(HttpExchange exchange) {
Expand Down