-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets #19497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets #19497
Conversation
When fetching stable offsets in the group coordinator, we iterate over all requested partitions. For each partition, we iterate over the group's ongoing transactions to check if there is a pending transactional offset commit for that partition. This can get slow when there are a large number of partitions and a large number of pending transactions. Instead, maintain a list of pending transactions per partition to speed up lookups.
@@ -194,9 +194,16 @@ public OffsetMetadataManager build() { | |||
|
|||
/** | |||
* The open transactions (producer ids) keyed by group. | |||
* Tracks whether groups have any open transactions. | |||
*/ | |||
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to replace this map entirely, but we use it in cleanupExpiredOffsets
to check whether a group has any pending transactions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use openTransactionsByGroupTopicAndPartition for this too? If the group is present in openTransactionsByGroupTopicAndPartition, it means that it has an open transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We clear entries from that map when offsets are deleted non-transactionally. If all offsets in a transaction are deleted, then there can be an open transaction for a group without a record of it in openTransactionsByGroupTopicAndPartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the 2nd map of group ids -> producer ids. Now we only track (group id, topic, partition) -> producer ids. As a side effect, it fixes KAFKA-19164.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments, it's mostly due to where the file was when picked up. That said, changes in areas like this are a good opportunity to improve structure as well. I get that it might add a bit of time now, but it'll pay off long-term by reducing complexity and easing future changes
} | ||
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic = | ||
openTransactionsByGroupTopicAndPartition.get(groupId); | ||
if (openTransactionsByTopic != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are three nested for loops, this can be avoided.
Something like the following might help (was done for quickness)
var openTransactionsByTopic = openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic == null) return;
for (var topicEntry : openTransactionsByTopic.entrySet()) {
String topic = topicEntry.getKey();
var openTransactionsByPartition = topicEntry.getValue();
for (var partitionEntry : openTransactionsByPartition.entrySet()) {
int partition = partitionEntry.getKey();
// Single check per partition
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this when refactoring the code, thank you. The previous code also has this bug, except it was not as obvious. I've fixed it now.
} | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole area is pretty badly structured, and maintainability will prove difficult, potentially fragile.
I would split this out into methods, and tidy some of it up. This is an example on how i'd put it, this code isn't tested baring mind and is more of a boiler plate example:
public void clearOpenTransactions(final String groupId, final String topic, inal int partition) {
final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = getPartitionMap(groupId, topic);
if (partitionMap == null) return;
final TimelineHashSet<Long> openProducerIds = partitionMap.get(partition);
if (openProducerIds == null) return;
removePendingOffsets(openProducerIds, groupId, topic, partition);
partitionMap.remove(partition);
cleanupIfEmpty(partitionMap, getTopicMap(groupId), topic);
cleanupIfEmpty(getTopicMap(groupId), openTransactionsByGroupTopicAndPartition, groupId);
}
private TimelineHashMap<Integer, TimelineHashSet<Long>> getPartitionMap(final String groupId, final String topic) {
final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (topicMap == null) return null;
return topicMap.get(topic);
}
private TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> getTopicMap(final String groupId) {
return openTransactionsByGroupTopicAndPartition.get(groupId);
}
private void removePendingOffsets(
final Set<Long> producerIds,
final String groupId,
final String topic,
final int partition) {
for (final Long producerId : producerIds) {
final Offsets offsets = pendingTransactionalOffsets.get(producerId);
if (offsets != null) {
offsets.remove(groupId, topic, partition);
}
}
}
private <K, V extends Map<?, ?>> void cleanupIfEmpty(final V innerMap, final Map<K, V> outerMap, final K key) {
if (innerMap != null && innerMap.isEmpty()) {
outerMap.remove(key);
}
}
} | ||
} | ||
}); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous point stands, how i'd do it would be something like the below attempt, the cleanupIfEmpty method from earlier here could be used if implemented
public void clearOpenTransactionsForProducer(final long producerId, final PendingOffsets pendingOffsets) {
for (final Map.Entry<String, Map<String, Map<Integer, OffsetAndMetadata>>> groupEntry : pendingOffsets.offsetsByGroup.entrySet()) {
final String groupId = groupEntry.getKey();
final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets = groupEntry.getValue();
removeProducerFromGroupTransactions(groupId, producerId);
final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (topicMap == null) continue;
processTopicOffsets(producerId, groupId, topicOffsets, topicMap);
cleanupIfEmpty(topicMap, openTransactionsByGroupTopicAndPartition, groupId);
}
}
private void removeProducerFromGroupTransactions(final String groupId, final long producerId) {
final TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId);
if (groupTransactions == null) return;
groupTransactions.remove(producerId);
if (groupTransactions.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
private void processTopicOffsets(
final long producerId,
final String groupId,
final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets,
final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap) {
for (final Map.Entry<String, Map<Integer, OffsetAndMetadata>> topicEntry : topicOffsets.entrySet()) {
final String topic = topicEntry.getKey();
final Map<Integer, OffsetAndMetadata> partitionOffsets = topicEntry.getValue();
final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic);
if (partitionMap == null) continue;
for (final Integer partitionId : partitionOffsets.keySet()) {
removeProducerFromPartitionMap(producerId, partitionId, partitionMap);
}
cleanupIfEmpty(partitionMap, topicMap, topic);
}
}
private void removeProducerFromPartitionMap(
final long producerId,
final int partitionId,
final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap) {
final TimelineHashSet<Long> partitionTransactions = partitionMap.get(partitionId);
if (partitionTransactions == null) return;
partitionTransactions.remove(producerId);
if (partitionTransactions.isEmpty()) {
partitionMap.remove(partitionId);
}
}
* The open transactions (producer ids) keyed by group id, topic name and partition id. | ||
* Tracks whether partitions have any pending transactional offsets. | ||
*/ | ||
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroupTopicAndPartition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your reasoning behind this addition, but it does introduce significant complexity and carries a high risk of data duplication if not carefully managed. While I appreciate this may serve as a stop-gap for now, I do think it’s important that this is revisited going forward. A more maintainable long-term solution, perhaps wrapping this logic in a dedicated structure or abstraction, would help reduce coupling and make it easier to evolve the logic cleanly.
@squah-confluent Thanks for the patch. Could we write a micro benchmark to demonstrate the gain? |
if (openTransactionsByTopic != null) { | ||
openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> { | ||
openTransactionsByPartition.forEach((partition, producerIds) -> { | ||
producerIds.forEach(producerId -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, why to iterate all producerIds if we don't use it actually in creating tombstone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this when refactoring the code, thank you. The previous code also has this bug, except it was not as obvious. I've fixed it now.
It was kept around in the initial PR to avoid deletion of groups with open transactions, but it's okay to delete groups with open transactions without any offsets, since committing the transaction is a no-op. If the client tries to add more transactional offsets to a deleted group, we may either recreate the group or return an error depending on the generation in the request.
Added a benchmark.
After:
|
@shaan150 I decided to do something a little different and factored out the code into |
private boolean contains(String groupId, String topic, int partition) { | ||
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition); | ||
return openTransactions != null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do something like
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap =
openTransactionsByGroup.get(groupId);
if (topicMap == null) return false;
TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic);
return partitionMap != null && partitionMap.containsKey(partition);
to avoid extra lookup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked and containsKey
does basically the same operation as a get
.
kafka/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
Lines 120 to 126 in 08f6042
public boolean containsKey(Object key) { | |
return containsKey(key, SnapshottableHashTable.LATEST_EPOCH); | |
} | |
public boolean containsKey(Object key, long epoch) { | |
return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null; | |
} |
kafka/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
Lines 139 to 150 in 08f6042
public V get(Object key) { | |
return get(key, SnapshottableHashTable.LATEST_EPOCH); | |
} | |
public V get(Object key, long epoch) { | |
Entry<K, V> entry = | |
snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch); | |
if (entry == null) { | |
return null; | |
} | |
return entry.getValue(); | |
} |
* | ||
* Values in each level of the map will never be empty collections. | ||
*/ | ||
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current 3-layer map has already improved the performance significantly. If we are still not happy with the perf maybe we can consider flattening the map to (group id, topic, partition) -- producer id set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried a couple of approaches and settled on inlining hasPendingTransactionalOffsets
into the offset fetch path. This way we don't do string comparisons of the group id and topic name for every partition and also avoid allocations from a compound key.
Benchmark (partitionCount) (transactionCount) Mode Cnt Score Error Units
TransactionalOffsetFetchBenchmark.run 4000 4000 avgt 5 0.129 ± 0.002 ms/op
When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.
This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.