Skip to content

refactor(inkless): pipelining committer threading #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class InklessConfig extends AbstractConfig {
private static final String FILE_MERGER_TEMP_DIR_DOC = "The temporary directory for file merging.";
private static final String FILE_MERGER_TEMP_DIR_DEFAULT = "/tmp/inkless/merger";

public static final String PRODUCE_UPLOAD_THREAD_POOL_SIZE_CONFIG = "produce.upload.thread.pool.size";
private static final String PRODUCE_UPLOAD_THREAD_POOL_SIZE_DOC = "Thread pool size to concurrently upload files to remote storage";
// Given that S3 upload is ~400ms P99, and commits to PG are ~50ms P99, defaulting to 8
// to avoid starting an upload if 8 commits are executing sequentially
private static final int PRODUCE_UPLOAD_THREAD_POOL_SIZE_DEFAULT = 8;


public static ConfigDef configDef() {
final ConfigDef configDef = new ConfigDef();

Expand Down Expand Up @@ -219,6 +226,14 @@ public static ConfigDef configDef() {
ConfigDef.Importance.LOW,
CONSUME_CACHE_MAX_COUNT_DOC
);
configDef.define(
PRODUCE_UPLOAD_THREAD_POOL_SIZE_CONFIG,
ConfigDef.Type.INT,
PRODUCE_UPLOAD_THREAD_POOL_SIZE_DEFAULT,
ConfigDef.Range.atLeast(1),
ConfigDef.Importance.LOW,
PRODUCE_UPLOAD_THREAD_POOL_SIZE_DOC
);

return configDef;
}
Expand Down Expand Up @@ -294,4 +309,8 @@ public Path fileMergeWorkDir() {
public Long cacheMaxCount() {
return getLong(CONSUME_CACHE_MAX_COUNT_CONFIG);
}

public int produceUploadThreadPoolSize() {
return getInt(PRODUCE_UPLOAD_THREAD_POOL_SIZE_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,69 +22,30 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.Time;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import io.aiven.inkless.TimeUtils;
import io.aiven.inkless.control_plane.CommitBatchResponse;

/**
* This job is responsible for completing the Append requests generating client responses.
*/
class AppendCompleterJob implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(AppendCompleterJob.class);
class AppendCompleter {
private static final Logger LOGGER = LoggerFactory.getLogger(AppendCompleter.class);

private final ClosedFile file;
private final Future<List<CommitBatchResponse>> commitFuture;
private final Time time;
private final Consumer<Long> durationCallback;

public AppendCompleterJob(ClosedFile file, Future<List<CommitBatchResponse>> commitFuture, Time time, Consumer<Long> durationCallback) {
public AppendCompleter(ClosedFile file) {
this.file = file;
this.commitFuture = commitFuture;
this.time = time;
this.durationCallback = durationCallback;
}

@Override
public void run() {
CommitResult commitResult = waitForCommit();
TimeUtils.measureDurationMs(time, () -> doComplete(commitResult), durationCallback);
}

void doComplete(CommitResult commitResult) {
if (commitResult.commitBatchResponses != null) {
finishCommitSuccessfully(commitResult.commitBatchResponses);
} else {
finishCommitWithError();
}
}

private CommitResult waitForCommit() {
try {
final List<CommitBatchResponse> commitBatchResponses = commitFuture.get();
return new CommitResult(commitBatchResponses, null);
} catch (final ExecutionException e) {
LOGGER.error("Failed upload", e);
return new CommitResult(null, e.getCause());
} catch (final InterruptedException e) {
// This is not expected as we try to shut down the executor gracefully.
LOGGER.error("Interrupted", e);
throw new RuntimeException(e);
}
}

private void finishCommitSuccessfully(List<CommitBatchResponse> commitBatchResponses) {
public void finishCommitSuccessfully(List<CommitBatchResponse> commitBatchResponses) {
LOGGER.debug("Committed successfully");

// Each request must have a response.
Expand Down Expand Up @@ -132,7 +93,7 @@ private static ProduceResponse.PartitionResponse partitionResponse(CommitBatchRe
);
}

private void finishCommitWithError() {
public void finishCommitWithError() {
for (final var entry : file.awaitingFuturesByRequest().entrySet()) {
final var originalRequest = file.originalRequests().get(entry.getKey());
final var result = originalRequest.entrySet().stream()
Expand All @@ -142,7 +103,4 @@ private void finishCommitWithError() {
entry.getValue().complete(result);
}
}

private record CommitResult(List<CommitBatchResponse> commitBatchResponses, Throwable commitBatchError) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public AppendHandler(final SharedState state) {
state.config().produceBufferMaxBytes(),
state.config().produceMaxUploadAttempts(),
state.config().produceUploadBackoff(),
state.config().produceUploadThreadPoolSize(),
state.brokerTopicStats()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import io.aiven.inkless.TimeUtils;
Expand All @@ -38,26 +37,28 @@ public class CacheStoreJob implements Runnable {
private final ObjectCache cache;
private final KeyAlignmentStrategy keyAlignmentStrategy;
private final byte[] data;
private final Future<ObjectKey> uploadFuture;
private final ObjectKey objectKey;
private final Consumer<Long> cacheStoreDurationCallback;

public CacheStoreJob(Time time, ObjectCache cache, KeyAlignmentStrategy keyAlignmentStrategy, byte[] data, Future<ObjectKey> uploadFuture, Consumer<Long> cacheStoreDurationCallback) {
public CacheStoreJob(
Time time,
ObjectCache cache,
KeyAlignmentStrategy keyAlignmentStrategy,
byte[] data,
ObjectKey objectKey,
Consumer<Long> cacheStoreDurationCallback
) {
this.time = time;
this.cache = cache;
this.keyAlignmentStrategy = keyAlignmentStrategy;
this.data = data;
this.uploadFuture = uploadFuture;
this.objectKey = objectKey;
this.cacheStoreDurationCallback = cacheStoreDurationCallback;
}

@Override
public void run() {
try {
ObjectKey objectKey = uploadFuture.get();
storeToCache(objectKey);
} catch (final Throwable e) {
// If the upload failed there's nothing to cache and we succeed vacuously.
}
storeToCache(objectKey);
}

private void storeToCache(ObjectKey objectKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -47,22 +45,22 @@ class FileCommitJob implements Supplier<List<CommitBatchResponse>> {

private final int brokerId;
private final ClosedFile file;
private final Future<ObjectKey> uploadFuture;
private final ObjectKey objectKey;
private final Time time;
private final ControlPlane controlPlane;
private final ObjectDeleter objectDeleter;
private final Consumer<Long> durationCallback;

FileCommitJob(final int brokerId,
final ClosedFile file,
final Future<ObjectKey> uploadFuture,
final ObjectKey objectKey,
final Time time,
final ControlPlane controlPlane,
final ObjectDeleter objectDeleter,
final Consumer<Long> durationCallback) {
this.brokerId = brokerId;
this.file = file;
this.uploadFuture = uploadFuture;
this.objectKey = objectKey;
this.controlPlane = controlPlane;
this.time = time;
this.objectDeleter = objectDeleter;
Expand All @@ -71,42 +69,28 @@ class FileCommitJob implements Supplier<List<CommitBatchResponse>> {

@Override
public List<CommitBatchResponse> get() {
final UploadResult uploadResult = waitForUpload();
return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(uploadResult), durationCallback);
return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(objectKey), durationCallback);
}

private UploadResult waitForUpload() {
private List<CommitBatchResponse> doCommit(final ObjectKey objectKey) {
LOGGER.debug("Uploaded {} successfully, committing", objectKey);
try {
final ObjectKey objectKey = uploadFuture.get();
return new UploadResult(objectKey, null);
} catch (final ExecutionException e) {
LOGGER.error("Failed upload", e);
return new UploadResult(null, e.getCause());
} catch (final InterruptedException e) {
// This is not expected as we try to shut down the executor gracefully.
LOGGER.error("Interrupted", e);
throw new RuntimeException(e);
}
}

private List<CommitBatchResponse> doCommit(final UploadResult result) {
if (result.objectKey != null) {
LOGGER.debug("Uploaded {} successfully, committing", result.objectKey);
try {
final var commitBatchResponses = controlPlane.commitFile(result.objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests());
LOGGER.debug("Committed successfully");
return commitBatchResponses;
} catch (final Exception e) {
LOGGER.error("Commit failed", e);
if (e instanceof ControlPlaneException) {
// only attempt to remove the uploaded file if it is a control plane error
tryDeleteFile(result.objectKey(), e);
}
throw e;
final var commitBatchResponses = controlPlane.commitFile(
objectKey.value(),
ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT,
brokerId,
file.size(),
file.commitBatchRequests()
);
LOGGER.debug("Committed successfully");
return commitBatchResponses;
} catch (final Exception e) {
LOGGER.error("Commit failed", e);
if (e instanceof ControlPlaneException) {
// only attempt to remove the uploaded file if it is a control plane error
tryDeleteFile(objectKey, e);
}
} else {
LOGGER.error("Upload failed: {}", result.uploadError.getMessage());
throw new RuntimeException("File not uploaded", result.uploadError);
throw e;
}
}

Expand All @@ -130,7 +114,4 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) {
LOGGER.error("Error commiting data, but not removing the uploaded file {} as it is not safe", objectKey, e);
}
}

private record UploadResult(ObjectKey objectKey, Throwable uploadError) {
}
}
Loading