-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19133: Support fetching for multiple remote fetch topic partitions in a single share fetch request #19592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the PR. Left a few comments.
@@ -291,11 +293,11 @@ public boolean tryComplete() { | |||
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for | |||
// those topic partitions. | |||
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); | |||
// Store the remote fetch info and the topic partition for which we need to perform remote fetch. | |||
Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be an existing issue with remote fetch. Consider the following.
- tryComplete() is called on 1 partition. The fetch offset is still in local log and readFromLog() returns a local fetchOffsetMetadata, which is then cached in sharePartition. The partition doesn't satisfy minBytes yet. tryCompete() returns false.
- tryComplete() is called again on that partition. Since fetchOffsetMetadata is cached in sharePartition, readFromLog() is not called. Now, the partition satisfies minBytes.
- onComplete() is called. The cached fetchOffsetMetadata is now only available in remote storage and readFromLog() returns an empty Records and a non-empty
logReadResult.info().delayedRemoteStorageFetch
. Since there is no logic to handle remote fetch in completeLocalLogShareFetchRequest(), an empty response is sent to the client. - The client will fetch with the same offset and step 2 and 3 will be repeated.
If a client gets into this situation, it will never make progress. Is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @junrao , thanks for pointing out this problem. I think this situation can potentially occur. In order to handle it, I propose that we can update the cached share partition's fetch offset metadata to null
for this offset during onComplete
while completing local log share fetch request. Something like below. I have added this in my latest commit. Let me know if this looks good. Thanks!
private void updateFetchOffsetMetadataForRemoteFetchPartitions(
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
) {
replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> {
if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition),
null
);
}
});
}
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the updated PR. A few more comments.
@@ -248,6 +250,8 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI | |||
// updated in a different tryComplete thread. | |||
responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); | |||
|
|||
updateFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing issue. Here, we add the partition to responseData if entry.getValue().info().delayedRemoteStorageFetch.isEmpty() is true. However, in completeRemoteStorageShareFetchRequest(), we skip it.
for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet()) {
if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
shareFetchPartitionData.add(
It would be useful to make them consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the check for logReadResult.info().delayedRemoteStorageFetch.isEmpty()
in processAcquiredTopicPartitionsForLocalLogFetch
as well
* @param topicPartitionData - Map containing the fetch offset for the topic partitions. | ||
* @param replicaManagerReadResponse - Map containing the readFromLog response from replicaManager for the topic partitions. | ||
*/ | ||
private void updateFetchOffsetMetadataForRemoteFetchPartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateFetchOffsetMetadataForRemoteFetchPartitions => resetFetchOffsetMetadataForRemoteFetchPartitions ?
Also, should we call this from completeRemoteStorageShareFetchRequest() too for the locally read partitions returning delayedRemoteStorageFetch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes to both the suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the updated PR. LGTM
@apoorvmittal10 : Since you requested a self review, I will let you review it again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, though it looks good I just left minor comments.
@@ -248,14 +250,18 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI | |||
// updated in a different tryComplete thread. | |||
responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); | |||
|
|||
resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method call seems strange in processAcquiredTopicPartitionsForLocalLogFetch
where we are calling some remote fetch method reset. I am sure it must be required but can you please write comments here. And then we can see it the processAcquiredTopicPartitionsForLocalLogFetch
remains valid or we need to find better one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method call is required to reset the fetch offset metadata for share partitions which might be having local log fetch during tryComplete, but changes to remote storage fetch in onComplete. Basically during tryComplete, we deduced that this share fetch request only contained local log fetch topic partitions, however during onComplete some of those local log fetch changed to remote storage fetch. The scenario where this creates the problem has been explained in this comment here.
)) | ||
); | ||
responseData.forEach((topicIdPartition, logReadResult) -> { | ||
if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: What happens if we have some delayedRemoteStorageFetch
data in logReadResut now? Is it ignored or further processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have some topic partitions which have non-empty delayedRemoteStorageFetch
data in logReadResut
while completing share fetch request for local log partitions in onComplete
, we will ignore them. Becuase in order to complete them, we need to create RemoteFetch
objects for them and wait for their completion in the purgatory and that is not possible in onComplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the changes.
…ons in a single share fetch request (apache#19592) ### About This PR removes the limitation in remote storage fetch for share groups of only performing remote fetch for a single topic partition in a share fetch request. With this PR, share groups can now fetch multiple remote storage topic partitions in a single share fetch request. ### Testing I have followed the [AK documentation](https://kafka.apache.org/documentation/#tiered_storage_config_ex) to test my code locally (by adopting `LocalTieredStorage.java`) and verify with the help of logs that remote storage is happening for multiple topic partitions in a single share fetch request. Also, verified it with the help of unit tests. Reviewers: Jun Rao <[email protected]>, Apoorv Mittal <[email protected]>
About
This PR removes the limitation in remote storage fetch for share groups
of only performing remote fetch for a single topic partition in a share
fetch request. With this PR, share groups can now fetch multiple remote
storage topic partitions in a single share fetch request.
Testing
I have followed the AK
documentation
to test my code locally (by adopting
LocalTieredStorage.java
) andverify with the help of logs that remote storage is happening for
multiple topic partitions in a single share fetch request. Also,
verified it with the help of unit tests.