Skip to content

KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets #19497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Prev Previous commit
Next Next commit
fixup: refactor
  • Loading branch information
squah-confluent committed Apr 29, 2025
commit e80d94c7c23a89f10caea11d97607c6c72d8eb7b
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
Expand Down Expand Up @@ -192,11 +193,165 @@ public OffsetMetadataManager build() {
*/
private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;

private final OpenTransactions openTransactions;

/**
* The open transactions (producer ids) keyed by group id, topic name and partition id.
* Tracks whether partitions have any pending transactional offsets that have not been deleted.
* Tracks open transactions (producer ids) by group id, topic name and partition id.
* It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
private class OpenTransactions {
/**
* The open transactions (producer ids) keyed by group id, topic name and partition id.
* Tracks whether partitions have any pending transactional offsets that have not been deleted.
*
* Values in each level of the map will never be empty collections.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current 3-layer map has already improved the performance significantly. If we are still not happy with the perf maybe we can consider flattening the map to (group id, topic, partition) -- producer id set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a couple of approaches and settled on inlining hasPendingTransactionalOffsets into the offset fetch path. This way we don't do string comparisons of the group id and topic name for every partition and also avoid allocations from a compound key.

Benchmark                              (partitionCount)  (transactionCount)  Mode  Cnt  Score   Error  Units
TransactionalOffsetFetchBenchmark.run              4000                4000  avgt    5  0.129 ± 0.002  ms/op


private OpenTransactions() {
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}

/**
* Adds a producer id to the open transactions for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @param producerId The producer id.
* @return {@code true} if the partition did not already have a pending offset from the producer id.
*/
private boolean add(String groupId, String topic, int partition, long producerId) {
return openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
}

/**
* Clears all open transactions for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
*/
private void clear(String groupId, String topic, int partition) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return;

TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return;

openTransactionsByPartition.remove(partition);

if (openTransactionsByPartition.isEmpty()) {
openTransactionsByTopic.remove(topic);
if (openTransactionsByTopic.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
}

/**
* Returns {@code true} if the given group has any pending transactional offsets.
*
* @param groupId The group id.
* @return {@code true} if the given group has any pending transactional offsets.
*/
private boolean contains(String groupId) {
return openTransactionsByGroup.containsKey(groupId);
}

/**
* Returns {@code true} if the given group has any pending transactional offsets for the given topic and partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @return {@code true} if the given group has any pending transactional offsets for the given topic and partition.
*/
private boolean contains(String groupId, String topic, int partition) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
return openTransactions != null;
}
Comment on lines +278 to +281
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do something like

        TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap = 
            openTransactionsByGroup.get(groupId);
            
        if (topicMap == null) return false;
        
        TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic);
        return partitionMap != null && partitionMap.containsKey(partition);

to avoid extra lookup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and containsKey does basically the same operation as a get.

public boolean containsKey(Object key) {
return containsKey(key, SnapshottableHashTable.LATEST_EPOCH);
}
public boolean containsKey(Object key, long epoch) {
return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) != null;
}

public V get(Object key) {
return get(key, SnapshottableHashTable.LATEST_EPOCH);
}
public V get(Object key, long epoch) {
Entry<K, V> entry =
snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
if (entry == null) {
return null;
}
return entry.getValue();
}


/**
* Performs the given action for each partition with a pending transactional offset for the given group.
*
* @param groupId The group id.
* @param action The action to be performed for each partition with a pending transactional offset.
*/
private void forEachTopicPartition(String groupId, BiConsumer<String, Integer> action) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return;

openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> {
openTransactionsByPartition.forEach((partition, producerIds) -> {
action.accept(topic, partition);
});
});
}

/**
* Performs the given action for each producer id with a pending transactional offset for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @param action The action to be performed for each producer id with a pending transactional offset.
*/
private void forEach(String groupId, String topic, int partition, Consumer<Long> action) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
if (openTransactions == null) return;

openTransactions.forEach(action);
}

/**
* Gets the set of producer ids with pending transactional offsets for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @return The set of producer ids with pending transactional offsets for the given group and topic partition.
*/
private TimelineHashSet<Long> get(String groupId, String topic, int partition) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return null;

TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return null;

return openTransactionsByPartition.get(partition);
}

/**
* Removes a producer id from the open transactions for the given group and topic partition.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition.
* @param producerId The producer id.
* @return {@code true} if the group and topic partition had a pending transactional offset from the producer id.
*/
private boolean remove(String groupId, String topic, int partition, long producerId) {
TimelineHashSet<Long> openTransactions = get(groupId, topic, partition);
if (openTransactions == null) return false;

boolean removed = openTransactions.remove(producerId);

if (openTransactions.isEmpty()) {
// Re-use the clean up in clear.
clear(groupId, topic, partition);
}

return removed;
}
}

private class Offsets {
/**
Expand Down Expand Up @@ -281,7 +436,7 @@ private OffsetAndMetadata remove(
this.metrics = metrics;
this.offsets = new Offsets();
this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactions = new OpenTransactions();
}

/**
Expand Down Expand Up @@ -651,18 +806,12 @@ public int deleteAllOffsets(
// Delete all the pending transactional offsets too. Here we only write a tombstone
// if the topic-partition was not in the main storage because we don't need to write
// two consecutive tombstones.
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic != null) {
openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> {
openTransactionsByPartition.forEach((partition, producerIds) -> {
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
});
});
}
openTransactions.forEachTopicPartition(groupId, (topic, partition) -> {
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
});

return numDeletedOffsets.get();
}
Expand All @@ -678,15 +827,7 @@ boolean hasPendingTransactionalOffsets(
String topic,
int partition
) {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return false;

TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return false;

TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition);
return openTransactions != null && !openTransactions.isEmpty();
return openTransactions.contains(groupId, topic, partition);
}

/**
Expand Down Expand Up @@ -876,7 +1017,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
metrics.record(OFFSET_EXPIRED_SENSOR_NAME, records.size());

// We don't want to remove the group if there are ongoing transactions with undeleted offsets.
return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId);
return allOffsetsExpired.get() && !openTransactions.contains(groupId);
}

/**
Expand Down Expand Up @@ -993,42 +1134,21 @@ public void replay(
partition,
OffsetAndMetadata.fromRecord(recordOffset, value)
);
openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
openTransactions.add(groupId, topic, partition, producerId);
}
} else {
if (offsets.remove(groupId, topic, partition) != null) {
metrics.decrementNumOffsets();
}

// Remove all the pending offset commits related to the tombstone.
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic != null) {
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition != null) {
TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition);
if (openTransactions != null) {
openTransactions.forEach(openProducerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
if (pendingOffsets != null) {
pendingOffsets.remove(groupId, topic, partition);
}
});

openTransactionsByPartition.remove(partition);
if (openTransactionsByPartition.isEmpty()) {
openTransactionsByTopic.remove(topic);
}
if (openTransactionsByTopic.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
openTransactions.forEach(groupId, topic, partition, openProducerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
if (pendingOffsets != null) {
pendingOffsets.remove(groupId, topic, partition);
}
}
});
openTransactions.clear(groupId, topic, partition);
}
}

Expand All @@ -1053,28 +1173,9 @@ public void replayEndTransactionMarker(
}

pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroup.get(groupId);
if (openTransactionsByTopic == null) return;

topicOffsets.forEach((topic, partitionOffsets) -> {
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return;

partitionOffsets.keySet().forEach(partitionId -> {
TimelineHashSet<Long> partitionTransactions = openTransactionsByPartition.get(partitionId);
if (partitionTransactions != null) {
partitionTransactions.remove(producerId);
if (partitionTransactions.isEmpty()) {
openTransactionsByPartition.remove(partitionId);
}
if (openTransactionsByPartition.isEmpty()) {
openTransactionsByTopic.remove(topic);
}
if (openTransactionsByTopic.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
openTransactions.remove(groupId, topic, partitionId, producerId);
});
});
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous point stands, how i'd do it would be something like the below attempt, the cleanupIfEmpty method from earlier here could be used if implemented

public void clearOpenTransactionsForProducer(final long producerId, final PendingOffsets pendingOffsets) {
    for (final Map.Entry<String, Map<String, Map<Integer, OffsetAndMetadata>>> groupEntry : pendingOffsets.offsetsByGroup.entrySet()) {
        final String groupId = groupEntry.getKey();
        final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets = groupEntry.getValue();

        removeProducerFromGroupTransactions(groupId, producerId);

        final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap =
            openTransactionsByGroupTopicAndPartition.get(groupId);
        if (topicMap == null) continue;

        processTopicOffsets(producerId, groupId, topicOffsets, topicMap);
        cleanupIfEmpty(topicMap, openTransactionsByGroupTopicAndPartition, groupId);
    }
}

private void removeProducerFromGroupTransactions(final String groupId, final long producerId) {
    final TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId);
    if (groupTransactions == null) return;

    groupTransactions.remove(producerId);
    if (groupTransactions.isEmpty()) {
        openTransactionsByGroup.remove(groupId);
    }
}

private void processTopicOffsets(
    final long producerId,
    final String groupId,
    final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets,
    final TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> topicMap) {

    for (final Map.Entry<String, Map<Integer, OffsetAndMetadata>> topicEntry : topicOffsets.entrySet()) {
        final String topic = topicEntry.getKey();
        final Map<Integer, OffsetAndMetadata> partitionOffsets = topicEntry.getValue();

        final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap = topicMap.get(topic);
        if (partitionMap == null) continue;

        for (final Integer partitionId : partitionOffsets.keySet()) {
            removeProducerFromPartitionMap(producerId, partitionId, partitionMap);
        }

        cleanupIfEmpty(partitionMap, topicMap, topic);
    }
}

private void removeProducerFromPartitionMap(
    final long producerId,
    final int partitionId,
    final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap) {

    final TimelineHashSet<Long> partitionTransactions = partitionMap.get(partitionId);
    if (partitionTransactions == null) return;

    partitionTransactions.remove(producerId);

    if (partitionTransactions.isEmpty()) {
        partitionMap.remove(partitionId);
    }
}

Expand Down
Loading