Skip to content

KAFKA-19133: Support fetching for multiple remote fetch topic partitions in a single share fetch request #19592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 5, 2025

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Apr 29, 2025

About

This PR removes the limitation in remote storage fetch for share groups
of only performing remote fetch for a single topic partition in a share
fetch request. With this PR, share groups can now fetch multiple remote
storage topic partitions in a single share fetch request.

Testing

I have followed the AK
documentation

to test my code locally (by adopting LocalTieredStorage.java) and
verify with the help of logs that remote storage is happening for
multiple topic partitions in a single share fetch request. Also,
verified it with the help of unit tests.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Apr 29, 2025
@adixitconfluent adixitconfluent marked this pull request as ready for review April 29, 2025 06:22
@AndrewJSchofield AndrewJSchofield removed the triage PRs from the community label Apr 29, 2025
@AndrewJSchofield AndrewJSchofield requested a review from junrao April 29, 2025 08:37
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the PR. Left a few comments.

@@ -291,11 +293,11 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
// Store the remote fetch info and the topic partition for which we need to perform remote fetch.
Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be an existing issue with remote fetch. Consider the following.

  1. tryComplete() is called on 1 partition. The fetch offset is still in local log and readFromLog() returns a local fetchOffsetMetadata, which is then cached in sharePartition. The partition doesn't satisfy minBytes yet. tryCompete() returns false.
  2. tryComplete() is called again on that partition. Since fetchOffsetMetadata is cached in sharePartition, readFromLog() is not called. Now, the partition satisfies minBytes.
  3. onComplete() is called. The cached fetchOffsetMetadata is now only available in remote storage and readFromLog() returns an empty Records and a non-empty logReadResult.info().delayedRemoteStorageFetch. Since there is no logic to handle remote fetch in completeLocalLogShareFetchRequest(), an empty response is sent to the client.
  4. The client will fetch with the same offset and step 2 and 3 will be repeated.

If a client gets into this situation, it will never make progress. Is this correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao , thanks for pointing out this problem. I think this situation can potentially occur. In order to handle it, I propose that we can update the cached share partition's fetch offset metadata to null for this offset during onComplete while completing local log share fetch request. Something like below. I have added this in my latest commit. Let me know if this looks good. Thanks!

    private void updateFetchOffsetMetadataForRemoteFetchPartitions(
        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
        LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
    ) {
        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> {
            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
                SharePartition sharePartition = sharePartitions.get(topicIdPartition);
                sharePartition.updateFetchOffsetMetadata(
                    topicPartitionData.get(topicIdPartition),
                    null
                );
            }
        });
    }

@adixitconfluent adixitconfluent requested a review from junrao May 2, 2025 08:05
@apoorvmittal10 apoorvmittal10 self-requested a review May 2, 2025 10:24
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. A few more comments.

@@ -248,6 +250,8 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
// updated in a different tryComplete thread.
responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched);

updateFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing issue. Here, we add the partition to responseData if entry.getValue().info().delayedRemoteStorageFetch.isEmpty() is true. However, in completeRemoteStorageShareFetchRequest(), we skip it.

                    for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet()) {
                        if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
                            shareFetchPartitionData.add(

It would be useful to make them consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check for logReadResult.info().delayedRemoteStorageFetch.isEmpty() in processAcquiredTopicPartitionsForLocalLogFetch as well

* @param topicPartitionData - Map containing the fetch offset for the topic partitions.
* @param replicaManagerReadResponse - Map containing the readFromLog response from replicaManager for the topic partitions.
*/
private void updateFetchOffsetMetadataForRemoteFetchPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateFetchOffsetMetadataForRemoteFetchPartitions => resetFetchOffsetMetadataForRemoteFetchPartitions ?

Also, should we call this from completeRemoteStorageShareFetchRequest() too for the locally read partitions returning delayedRemoteStorageFetch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to both the suggestions.

@adixitconfluent adixitconfluent requested a review from junrao May 2, 2025 17:59
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. LGTM

@junrao
Copy link
Contributor

junrao commented May 2, 2025

@apoorvmittal10 : Since you requested a self review, I will let you review it again.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, though it looks good I just left minor comments.

@@ -248,14 +250,18 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
// updated in a different tryComplete thread.
responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched);

resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method call seems strange in processAcquiredTopicPartitionsForLocalLogFetch where we are calling some remote fetch method reset. I am sure it must be required but can you please write comments here. And then we can see it the processAcquiredTopicPartitionsForLocalLogFetch remains valid or we need to find better one.

Copy link
Contributor Author

@adixitconfluent adixitconfluent May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method call is required to reset the fetch offset metadata for share partitions which might be having local log fetch during tryComplete, but changes to remote storage fetch in onComplete. Basically during tryComplete, we deduced that this share fetch request only contained local log fetch topic partitions, however during onComplete some of those local log fetch changed to remote storage fetch. The scenario where this creates the problem has been explained in this comment here.

))
);
responseData.forEach((topicIdPartition, logReadResult) -> {
if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What happens if we have some delayedRemoteStorageFetch data in logReadResut now? Is it ignored or further processed?

Copy link
Contributor Author

@adixitconfluent adixitconfluent May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have some topic partitions which have non-empty delayedRemoteStorageFetch data in logReadResut while completing share fetch request for local log partitions in onComplete, we will ignore them. Becuase in order to complete them, we need to create RemoteFetch objects for them and wait for their completion in the purgatory and that is not possible in onComplete

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the changes.

@apoorvmittal10 apoorvmittal10 merged commit 81c3a28 into apache:trunk May 5, 2025
24 checks passed
shmily7829 pushed a commit to shmily7829/kafka that referenced this pull request May 7, 2025
…ons in a single share fetch request (apache#19592)

### About
This PR removes the limitation in remote storage fetch for share groups
of only performing remote fetch for a single topic partition in a share
fetch request. With this PR, share groups can now fetch multiple remote
storage topic partitions in a single share fetch request.

### Testing
I have followed the [AK

documentation](https://kafka.apache.org/documentation/#tiered_storage_config_ex)
to test my code locally (by adopting `LocalTieredStorage.java`) and
verify with the help of logs that remote storage is happening for
multiple topic partitions in a single share fetch request. Also,
verified it with the help of unit tests.

Reviewers: Jun Rao <[email protected]>, Apoorv Mittal <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants