Skip to content

KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets #19497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

squah-confluent
Copy link
Contributor

@squah-confluent squah-confluent commented Apr 16, 2025

When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.

This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.

When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.

This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.
@github-actions github-actions bot added the triage PRs from the community label Apr 16, 2025
@@ -194,9 +194,16 @@ public OffsetMetadataManager build() {

/**
* The open transactions (producer ids) keyed by group.
* Tracks whether groups have any open transactions.
*/
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to replace this map entirely, but we use it in cleanupExpiredOffsets to check whether a group has any pending transactions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use openTransactionsByGroupTopicAndPartition for this too? If the group is present in openTransactionsByGroupTopicAndPartition, it means that it has an open transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We clear entries from that map when offsets are deleted non-transactionally. If all offsets in a transaction are deleted, then there can be an open transaction for a group without a record of it in openTransactionsByGroupTopicAndPartition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the 2nd map of group ids -> producer ids. Now we only track (group id, topic, partition) -> producer ids. As a side effect, it fixes KAFKA-19164.

Copy link

@shaan150 shaan150 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments, it's mostly due to where the file was when picked up. That said, changes in areas like this are a good opportunity to improve structure as well. I get that it might add a bit of time now, but it'll pay off long-term by reducing complexity and easing future changes

}
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three nested for loops, this can be avoided.

Something like the following might help (was done for quickness)

var openTransactionsByTopic = openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic == null) return;

for (var topicEntry : openTransactionsByTopic.entrySet()) {
    String topic = topicEntry.getKey();
    var openTransactionsByPartition = topicEntry.getValue();

    for (var partitionEntry : openTransactionsByPartition.entrySet()) {
        int partition = partitionEntry.getKey();

        // Single check per partition
        if (!hasCommittedOffset(groupId, topic, partition)) {
            records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
            numDeletedOffsets.getAndIncrement();
        }
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this when refactoring the code, thank you. The previous code also has this bug, except it was not as obvious. I've fixed it now.

}
});
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole area is pretty badly structured, and maintainability will prove difficult, potentially fragile.

I would split this out into methods, and tidy some of it up. This is an example on how i'd put it, this code isn't tested baring mind and is more of a boiler plate example:

    public void clearOpenTransactions(final String groupId, final String topic, inal int partition) {
        final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = getPartitionMap(groupId, topic);
        if (partitionMap == null) return;
    
        final TimelineHashSet<Long> openProducerIds = partitionMap.get(partition);
        if (openProducerIds == null) return;
    
        removePendingOffsets(openProducerIds, groupId, topic, partition);
    
        partitionMap.remove(partition);
        cleanupIfEmpty(partitionMap, getTopicMap(groupId), topic);
        cleanupIfEmpty(getTopicMap(groupId), openTransactionsByGroupTopicAndPartition, groupId);
    }
    
    private TimelineHashMap<Integer, TimelineHashSet<Long>> getPartitionMap(final String groupId, final String topic) {
        final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap =
            openTransactionsByGroupTopicAndPartition.get(groupId);
        if (topicMap == null) return null;
        return topicMap.get(topic);
    }
    
    private TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> getTopicMap(final String groupId) {
        return openTransactionsByGroupTopicAndPartition.get(groupId);
    }
    
    private void removePendingOffsets(
        final Set<Long> producerIds, 
        final String groupId, 
        final String topic, 
        final int partition) {
        for (final Long producerId : producerIds) {
            final Offsets offsets = pendingTransactionalOffsets.get(producerId);
            if (offsets != null) {
                offsets.remove(groupId, topic, partition);
            }
        }
    }
    
    private <K, V extends Map<?, ?>> void cleanupIfEmpty(final V innerMap, final Map<K, V> outerMap, final K key) {
        if (innerMap != null && innerMap.isEmpty()) {
            outerMap.remove(key);
        }
    }

}
}
});
});
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous point stands, how i'd do it would be something like the below attempt, the cleanupIfEmpty method from earlier here could be used if implemented

public void clearOpenTransactionsForProducer(final long producerId, final PendingOffsets pendingOffsets) {
    for (final Map.Entry<String, Map<String, Map<Integer, OffsetAndMetadata>>> groupEntry : pendingOffsets.offsetsByGroup.entrySet()) {
        final String groupId = groupEntry.getKey();
        final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets = groupEntry.getValue();

        removeProducerFromGroupTransactions(groupId, producerId);

        final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap =
            openTransactionsByGroupTopicAndPartition.get(groupId);
        if (topicMap == null) continue;

        processTopicOffsets(producerId, groupId, topicOffsets, topicMap);
        cleanupIfEmpty(topicMap, openTransactionsByGroupTopicAndPartition, groupId);
    }
}

private void removeProducerFromGroupTransactions(final String groupId, final long producerId) {
    final TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId);
    if (groupTransactions == null) return;

    groupTransactions.remove(producerId);
    if (groupTransactions.isEmpty()) {
        openTransactionsByGroup.remove(groupId);
    }
}

private void processTopicOffsets(
    final long producerId,
    final String groupId,
    final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets,
    final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap) {

    for (final Map.Entry<String, Map<Integer, OffsetAndMetadata>> topicEntry : topicOffsets.entrySet()) {
        final String topic = topicEntry.getKey();
        final Map<Integer, OffsetAndMetadata> partitionOffsets = topicEntry.getValue();

        final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic);
        if (partitionMap == null) continue;

        for (final Integer partitionId : partitionOffsets.keySet()) {
            removeProducerFromPartitionMap(producerId, partitionId, partitionMap);
        }

        cleanupIfEmpty(partitionMap, topicMap, topic);
    }
}

private void removeProducerFromPartitionMap(
    final long producerId,
    final int partitionId,
    final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap) {

    final TimelineHashSet<Long> partitionTransactions = partitionMap.get(partitionId);
    if (partitionTransactions == null) return;

    partitionTransactions.remove(producerId);

    if (partitionTransactions.isEmpty()) {
        partitionMap.remove(partitionId);
    }
}

* The open transactions (producer ids) keyed by group id, topic name and partition id.
* Tracks whether partitions have any pending transactional offsets.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroupTopicAndPartition;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your reasoning behind this addition, but it does introduce significant complexity and carries a high risk of data duplication if not carefully managed. While I appreciate this may serve as a stop-gap for now, I do think it’s important that this is revisited going forward. A more maintainable long-term solution, perhaps wrapping this logic in a dedicated structure or abstraction, would help reduce coupling and make it easier to evolve the logic cleanly.

@github-actions github-actions bot removed the triage PRs from the community label Apr 17, 2025
@dajac
Copy link
Member

dajac commented Apr 17, 2025

@squah-confluent Thanks for the patch. Could we write a micro benchmark to demonstrate the gain?

@dajac dajac self-requested a review April 17, 2025 10:02
@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Apr 17, 2025
if (openTransactionsByTopic != null) {
openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> {
openTransactionsByPartition.forEach((partition, producerIds) -> {
producerIds.forEach(producerId -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, why to iterate all producerIds if we don't use it actually in creating tombstone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this when refactoring the code, thank you. The previous code also has this bug, except it was not as obvious. I've fixed it now.

It was kept around in the initial PR to avoid deletion of groups with
open transactions, but it's okay to delete groups with open transactions
without any offsets, since committing the transaction is a no-op. If the
client tries to add more transactional offsets to a deleted group, we
may either recreate the group or return an error depending on the
generation in the request.
@squah-confluent squah-confluent changed the title KAFKA-19160: Improve performance of fetching stable offsets KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets Apr 29, 2025
@squah-confluent
Copy link
Contributor Author

squah-confluent commented Apr 29, 2025

@squah-confluent Thanks for the patch. Could we write a micro benchmark to demonstrate the gain?

Added a benchmark.
Before:

Benchmark                              (partitionCount)  (transactionCount)  Mode  Cnt    Score    Error  Units
TransactionalOffsetFetchBenchmark.run              4000                4000  avgt    5  452.957 ± 7.883  ms/op

After:

Benchmark                              (partitionCount)  (transactionCount)  Mode  Cnt  Score   Error  Units
TransactionalOffsetFetchBenchmark.run              4000                4000  avgt    5  0.196 ± 0.005  ms/op

@squah-confluent
Copy link
Contributor Author

@shaan150 I decided to do something a little different and factored out the code into Map interface-like operations.

Comment on lines +275 to +278
private boolean contains(String groupId, String topic, int partition) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
return openTransactions != null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do something like

        TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap = 
            openTransactionsByGroup.get(groupId);
            
        if (topicMap == null) return false;
        
        TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic);
        return partitionMap != null && partitionMap.containsKey(partition);

to avoid extra lookup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and containsKey does basically the same operation as a get.

public boolean containsKey(Object key) {
return containsKey(key, SnapshottableHashTable.LATEST_EPOCH);
}
public boolean containsKey(Object key, long epoch) {
return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null;
}

public V get(Object key) {
return get(key, SnapshottableHashTable.LATEST_EPOCH);
}
public V get(Object key, long epoch) {
Entry<K, V> entry =
snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
if (entry == null) {
return null;
}
return entry.getValue();
}

*
* Values in each level of the map will never be empty collections.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current 3-layer map has already improved the performance significantly. If we are still not happy with the perf maybe we can consider flattening the map to (group id, topic, partition) -- producer id set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a couple of approaches and settled on inlining hasPendingTransactionalOffsets into the offset fetch path. This way we don't do string comparisons of the group id and topic name for every partition and also avoid allocations from a compound key.

Benchmark                              (partitionCount)  (transactionCount)  Mode  Cnt  Score   Error  Units
TransactionalOffsetFetchBenchmark.run              4000                4000  avgt    5  0.129 ± 0.002  ms/op

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved KIP-848 The Next Generation of the Consumer Rebalance Protocol performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants