Skip to content

Denser in-memory representation of ShardBlobsToDelete #109848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Next Next commit
Denser in-memory representation of ShardBlobsToDelete
Today each blob to be deleted at the end of a snapshot delete costs us
~80B of heap, and sometimes that can add up to GiBs of temporary heap
usage in total. This commit changes the in-memory representation to use
a compressed stream of pure bytes, which should be more than 4x denser.

Partially mitigates #108278
  • Loading branch information
DaveCTurner committed Jun 18, 2024
commit dd46a5c3f1e67f0fcbf72e2c4051c90ec12e657b
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
Expand All @@ -77,6 +84,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -122,16 +130,19 @@
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;

import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -1010,10 +1021,35 @@ class SnapshotsDeletion {
// The overall flow of execution

void runDelete(SnapshotDeleteListener listener) {
final var releasingListener = new SnapshotDeleteListener() {
@Override
public void onDone() {
try {
shardBlobsToDelete.close();
} finally {
listener.onDone();
}
}

@Override
public void onRepositoryDataWritten(RepositoryData repositoryData) {
listener.onRepositoryDataWritten(repositoryData);
}

@Override
public void onFailure(Exception e) {
try {
shardBlobsToDelete.close();
} finally {
listener.onFailure(e);
}

}
};
if (useShardGenerations) {
runWithUniqueShardMetadataNaming(listener);
runWithUniqueShardMetadataNaming(releasingListener);
} else {
runWithLegacyNumericShardMetadataNaming(wrapWithWeakConsistencyProtection(listener));
runWithLegacyNumericShardMetadataNaming(wrapWithWeakConsistencyProtection(releasingListener));
}
}

Expand Down Expand Up @@ -1088,14 +1124,15 @@ void runCleanup(ActionListener<DeleteResult> listener) {
.map(IndexId::getId)
.collect(Collectors.toSet());
final List<String> staleRootBlobs = staleRootBlobs(originalRepositoryData, originalRootBlobs.keySet());
final var releasingListener = ActionListener.releaseAfter(listener, shardBlobsToDelete);
if (survivingIndexIds.equals(originalIndexContainers.keySet()) && staleRootBlobs.isEmpty()) {
// Nothing to clean up we return
listener.onResponse(DeleteResult.ZERO);
releasingListener.onResponse(DeleteResult.ZERO);
} else {
// write new index-N blob to ensure concurrent operations will fail
updateRepositoryData(
originalRepositoryData,
listener.delegateFailureAndWrap(
releasingListener.delegateFailureAndWrap(
// TODO should we pass newRepositoryData to cleanupStaleBlobs()?
(l, newRepositoryData) -> cleanupUnlinkedRootAndIndicesBlobs(
originalRepositoryData,
Expand Down Expand Up @@ -1513,22 +1550,27 @@ private void logStaleRootLevelBlobs(
/**
* Tracks the shard-level blobs which can be deleted once all the metadata updates have completed during a snapshot deletion.
*/
class ShardBlobsToDelete {
class ShardBlobsToDelete implements Releasable {

/**
* The result of removing a snapshot from a shard folder in the repository.
*
* @param indexId Index that the snapshot was removed from
* @param indexId Repository UUID for index that the snapshot was removed from
* @param shardId Shard id that the snapshot was removed from
* @param newGeneration Id of the new index-${uuid} blob that does not include the snapshot any more
* @param blobsToDelete Blob names in the shard directory that have become unreferenced in the new shard generation
*/
private record ShardSnapshotMetaDeleteResult(
IndexId indexId,
int shardId,
ShardGeneration newGeneration,
Collection<String> blobsToDelete
) {}
private record ShardSnapshotMetaDeleteResult(String indexId, int shardId, Collection<String> blobsToDelete) implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexId);
out.writeVInt(shardId);
out.writeStringCollection(blobsToDelete);
}

ShardSnapshotMetaDeleteResult(StreamInput in) throws IOException {
this(in.readString(), in.readVInt(), in.readStringCollectionAsImmutableList());
}
}

/**
* <p>
Expand All @@ -1539,31 +1581,82 @@ private record ShardSnapshotMetaDeleteResult(
* no further synchronization
* </p>
*/
private final List<ShardSnapshotMetaDeleteResult> shardDeleteResults = new ArrayList<>();
private final BytesStreamOutput shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);

private int resultCount = 0;

private final StreamOutput compressed;

private final ArrayList<Closeable> resources = new ArrayList<>();

private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder();

ShardBlobsToDelete() {
try {
this.compressed = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(shardDeleteResults))
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
resources.add(compressed);
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
}

synchronized void addShardDeleteResult(
IndexId indexId,
int shardId,
ShardGeneration newGeneration,
Collection<String> blobsToDelete
) {
shardDeleteResults.add(new ShardSnapshotMetaDeleteResult(indexId, shardId, newGeneration, blobsToDelete));
try {
shardGenerationsBuilder.put(indexId, shardId, newGeneration);
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
Copy link
Contributor

@mhl-b mhl-b Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand blobsToDelete is already occupy memory we want to optimize. Adding in-memory compression will temporary increase heap usage by 20%? How do you know that blobsToDelete will be GC-ed after compression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing else retaining it apart from the reference in ShardBlobsToDelete#shardDeleteResults.

resultCount += 1;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public ShardGenerations getUpdatedShardGenerations() {
final var builder = ShardGenerations.builder();
for (var shardResult : shardDeleteResults) {
builder.put(shardResult.indexId, shardResult.shardId, shardResult.newGeneration);
}
return builder.build();
return shardGenerationsBuilder.build();
}

public Iterator<String> getBlobPaths() {
return Iterators.flatMap(shardDeleteResults.iterator(), shardResult -> {
final var shardPath = shardPath(shardResult.indexId, shardResult.shardId).buildAsString();
final StreamInput input;
try {
compressed.close();
input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(shardDeleteResults.bytes().streamInput());
resources.add(input);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return Iterators.flatMap(Iterators.forRange(0, resultCount, i -> {
try {
return new ShardSnapshotMetaDeleteResult(input);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}), shardResult -> {
final var shardPath = shardPath(new IndexId("_na_", shardResult.indexId), shardResult.shardId).buildAsString();
return Iterators.map(shardResult.blobsToDelete.iterator(), blob -> shardPath + blob);
});
}

@Override
public void close() {
try {
IOUtils.close(resources);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

// exposed for tests
int sizeInBytes() {
return shardDeleteResults.size();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -512,39 +511,50 @@ private Environment createEnvironment() {

public void testShardBlobsToDelete() {
final var repo = setupRepo();
final var shardBlobsToDelete = repo.new ShardBlobsToDelete();
final var expectedShardGenerations = ShardGenerations.builder();
final var expectedBlobsToDelete = new HashSet<String>();

final var countDownLatch = new CountDownLatch(1);
try (var refs = new RefCountingRunnable(countDownLatch::countDown)) {
for (int index = between(0, 10); index > 0; index--) {
final var indexId = new IndexId(randomIdentifier(), randomUUID());
for (int shard = between(1, 3); shard > 0; shard--) {
final var shardId = shard;
final var shardGeneration = new ShardGeneration(randomUUID());
expectedShardGenerations.put(indexId, shard, shardGeneration);
final var blobsToDelete = randomList(10, ESTestCase::randomIdentifier);
final var indexPath = repo.basePath().add("indices").add(indexId.getId()).add(Integer.toString(shard)).buildAsString();
for (final var blobToDelete : blobsToDelete) {
expectedBlobsToDelete.add(indexPath + blobToDelete);
}

repo.threadPool()
.generic()
.execute(
ActionRunnable.run(
refs.acquireListener(),
() -> shardBlobsToDelete.addShardDeleteResult(indexId, shardId, shardGeneration, blobsToDelete)
)
try (var shardBlobsToDelete = repo.new ShardBlobsToDelete()) {
final var expectedShardGenerations = ShardGenerations.builder();
final var expectedBlobsToDelete = new HashSet<String>();

final var countDownLatch = new CountDownLatch(1);
int blobCount = 0;
try (var refs = new RefCountingRunnable(countDownLatch::countDown)) {
for (int index = between(0, 1000); index > 0; index--) {
final var indexId = new IndexId(randomIdentifier(), randomUUID());
for (int shard = between(1, 30); shard > 0; shard--) {
final var shardId = shard;
final var shardGeneration = new ShardGeneration(randomUUID());
expectedShardGenerations.put(indexId, shard, shardGeneration);
final var blobsToDelete = randomList(
100,
() -> randomFrom("meta-", "index-", "snap-") + randomUUID() + randomFrom("", ".dat")
);
blobCount += blobsToDelete.size();
final var indexPath = repo.basePath()
.add("indices")
.add(indexId.getId())
.add(Integer.toString(shard))
.buildAsString();
for (final var blobToDelete : blobsToDelete) {
expectedBlobsToDelete.add(indexPath + blobToDelete);
}

repo.threadPool()
.generic()
.execute(
ActionRunnable.run(
refs.acquireListener(),
() -> shardBlobsToDelete.addShardDeleteResult(indexId, shardId, shardGeneration, blobsToDelete)
)
);
}
}
}
safeAwait(countDownLatch);
assertEquals(expectedShardGenerations.build(), shardBlobsToDelete.getUpdatedShardGenerations());
shardBlobsToDelete.getBlobPaths().forEachRemaining(s -> assertTrue(expectedBlobsToDelete.remove(s)));
assertThat(expectedBlobsToDelete, empty());
assertThat(shardBlobsToDelete.sizeInBytes(), lessThanOrEqualTo(Math.max(ByteSizeUnit.KB.toIntBytes(1), 20 * blobCount)));
}
safeAwait(countDownLatch);
assertEquals(expectedShardGenerations.build(), shardBlobsToDelete.getUpdatedShardGenerations());
shardBlobsToDelete.getBlobPaths().forEachRemaining(s -> assertTrue(expectedBlobsToDelete.remove(s)));
assertThat(expectedBlobsToDelete, empty());
}

public void testUuidCreationLogging() {
Expand Down