Skip to content

Fix S3 deletion bugs #129891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions docs/changelog/129891.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129891
summary: Fix S3 deletion bugs
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {
final AtomicLong deletedBytes = new AtomicLong();
try (var clientReference = blobStore.clientReference()) {
ListObjectsV2Response prevListing = null;
while (true) {
while (prevListing == null || prevListing.isTruncated()) {
final var listObjectsRequestBuilder = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath);
S3BlobStore.configureRequestForMetrics(listObjectsRequestBuilder, blobStore, Operation.LIST_OBJECTS, purpose);
if (prevListing != null) {
Expand All @@ -412,13 +412,8 @@ public DeleteResult delete(OperationPurpose purpose) throws IOException {
deletedBytes.addAndGet(s3Object.size());
return s3Object.key();
});
if (listObjectsResponse.isTruncated()) {
blobStore.deleteBlobs(purpose, blobNameIterator);
prevListing = listObjectsResponse;
} else {
blobStore.deleteBlobs(purpose, Iterators.concat(blobNameIterator, Iterators.single(keyPath)));
break;
}
blobStore.deleteBlobs(purpose, blobNameIterator);
prevListing = listObjectsResponse;
}
} catch (final SdkException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.StorageClass;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -355,13 +355,9 @@ private void deletePartition(OperationPurpose purpose, List<ObjectIdentifier> pa
while (true) {
try (AmazonS3Reference clientReference = clientReference()) {
final var response = clientReference.client().deleteObjects(bulkDelete(purpose, this, partition));
if (response.hasErrors()) {
final var exception = new ElasticsearchException(buildDeletionErrorMessage(response.errors()));
logger.warn(exception.getMessage(), exception);
deletionExceptions.useOrMaybeSuppress(exception);
return;
if (maybeRecordDeleteErrors(purpose, response, deletionExceptions) == false) {
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
}
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
return;
} catch (SdkException e) {
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
Expand All @@ -383,23 +379,50 @@ private void deletePartition(OperationPurpose purpose, List<ObjectIdentifier> pa
}
}

private String buildDeletionErrorMessage(List<S3Error> errors) {
final var sb = new StringBuilder("Failed to delete some blobs ");
for (int i = 0; i < errors.size() && i < MAX_DELETE_EXCEPTIONS; i++) {
final var err = errors.get(i);
sb.append("[").append(err.key()).append("][").append(err.code()).append("][").append(err.message()).append("]");
if (i < errors.size() - 1) {
sb.append(",");
private static boolean maybeRecordDeleteErrors(
OperationPurpose purpose,
DeleteObjectsResponse response,
DeletionExceptions deletionExceptions
) {
if (response.hasErrors() == false) {
return false;
}

final var errors = response.errors();
int errorCount = 0;
StringBuilder sb = null;

for (final var err : errors) {
if (purpose != OperationPurpose.REPOSITORY_ANALYSIS && "NoSuchKey".equals(err.code())) {
// The blob does not exist, which is what we wanted, so let's count that as a win
// (except for repo analysis where we can be certain that the blobs being deleted do all exist)
continue;
}

if (errorCount < MAX_DELETE_EXCEPTIONS) {
if (errorCount == 0) {
sb = new StringBuilder("Failed to delete some blobs ");
} else {
sb.append(",");
}
sb.append("[").append(err.key()).append("][").append(err.code()).append("][").append(err.message()).append("]");
}

errorCount += 1;
}
if (errors.size() > MAX_DELETE_EXCEPTIONS) {
sb.append("... (")
.append(errors.size())
.append(" in total, ")
.append(errors.size() - MAX_DELETE_EXCEPTIONS)
.append(" omitted)");

if (errorCount == 0) {
return false;
}
return sb.toString();

if (MAX_DELETE_EXCEPTIONS < errorCount) {
sb.append("... (").append(errorCount).append(" in total, ").append(errorCount - MAX_DELETE_EXCEPTIONS).append(" omitted)");
}

final var exception = new ElasticsearchException(sb.toString());
logger.warn(exception.getMessage(), exception);
deletionExceptions.useOrMaybeSuppress(exception);
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.fixture.HttpHeaderParser;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -46,9 +46,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.test.fixture.HttpHeaderParser.parseRangeHeader;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -85,6 +88,8 @@ public S3HttpHandler(final String bucket, @Nullable final String basePath) {
*/
private static final Set<String> METHODS_HAVING_NO_REQUEST_BODY = Set.of("GET", "HEAD", "DELETE");

private static final QName MULTI_OBJECT_DELETE_KEY_QNAME = new QName("http://s3.amazonaws.com/doc/2006-03-01/", "Key");

@Override
public void handle(final HttpExchange exchange) throws IOException {
// Remove custom query parameters before processing the request. This simulates how S3 ignores them.
Expand Down Expand Up @@ -350,22 +355,57 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);

} else if (request.isMultiObjectDeleteRequest()) {
final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));

final StringBuilder deletes = new StringBuilder();
deletes.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
deletes.append("<DeleteResult>");
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext();) {
Map.Entry<String, BytesReference> blob = iterator.next();
String key = blob.getKey().replace("/" + bucket + "/", "");
if (requestBody.contains("<Key>" + key + "</Key>")) {
deletes.append("<Deleted><Key>").append(key).append("</Key></Deleted>");
iterator.remove();
final var resultBuilder = new StringBuilder();
resultBuilder.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
resultBuilder.append("<DeleteResult>");

final var errorBuilder = new StringBuilder();

try {
final var xmlStreamReader = XMLInputFactory.newDefaultFactory().createXMLStreamReader(exchange.getRequestBody());
try {
for (; xmlStreamReader.getEventType() != XMLStreamConstants.END_DOCUMENT; xmlStreamReader.next()) {
if (xmlStreamReader.getEventType() == XMLStreamConstants.START_ELEMENT) {
if (xmlStreamReader.getName().equals(MULTI_OBJECT_DELETE_KEY_QNAME)) {
xmlStreamReader.next();
assertEquals(XMLStreamConstants.CHARACTERS, xmlStreamReader.getEventType());
final var blobName = xmlStreamReader.getText();
if (blobs.remove("/" + bucket + "/" + blobName) == null) {
errorBuilder.append("<Error><Code>NoSuchKey</Code><Key>")
.append(blobName)
.append("</Key><Message>Blob with path [/")
.append(bucket)
.append('/')
.append(blobName)
.append("] not found in ")
.append(blobs.keySet())
.append("</Message><VersionId>")
.append(UUIDs.randomBase64UUID())
.append("</VersionId></Error>");
} else {
resultBuilder.append("<Deleted><Key>").append(blobName).append("</Key></Deleted>");
}
}
}
}
} finally {
xmlStreamReader.close();
}
} catch (XMLStreamException xmlStreamException) {
logger.error("XML exception in multi-object delete", xmlStreamException);
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
return;
}
deletes.append("</DeleteResult>");

byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8);
if (ESTestCase.randomBoolean()) {
// In practice the real S3 doesn't report errors for blobs that did not exist (this is the desired outcome after a
// delete operation anyway) but this isn't documented, so other implementations may return these errors and can
// legitimately expect Elasticsearch to handle them correctly.
resultBuilder.append(errorBuilder);
}
resultBuilder.append("</DeleteResult>");
byte[] response = resultBuilder.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,11 @@ public void testDeleteBlobs() throws IOException {
assertEquals(container.listBlobs(randomPurpose()).size(), 2);
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobNames.iterator());
assertTrue(container.listBlobs(randomPurpose()).isEmpty());
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobNames.iterator()); // does not raise when blobs
// don't exist
container.deleteBlobsIgnoringIfNotExists(
// does not raise when blobs don't exist, except for REPOSITORY_ANALYSIS which is strict
randomValueOtherThan(OperationPurpose.REPOSITORY_ANALYSIS, BlobStoreTestUtil::randomPurpose),
blobNames.iterator()
);
}
}

Expand Down
Loading