Skip to content

KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 #20114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: 3.9
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,30 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The "Round-Robin" partitioner
*
* This partitioning strategy can be used when user wants
* to distribute the writes to all partitions equally. This
* is the behaviour regardless of record key hash.
*
* The "Round-Robin" partitioner - MODIFIED TO WORK PROPERLY WITH STICKY PARTITIONING (KIP-480)
* <p>
* This partitioning strategy can be used when user wants to distribute the writes to all
* partitions equally. This is the behaviour regardless of record key hash.
*/
public class RoundRobinPartitioner implements Partitioner {
private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinPartitioner.class);
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Queue<Integer>> topicPartitionQueueMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {}
public void configure(Map<String, ?> configs) {
}

/**
* Compute the partition for the given record.
Expand All @@ -50,24 +56,63 @@ public void configure(Map<String, ?> configs) {}
* @param cluster The current cluster metadata
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
public int partition(
String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);
Integer queuedPartition = partitionQueue.poll();
if (queuedPartition != null) {
LOGGER.trace("Partition chosen from queue: {}", queuedPartition);
return queuedPartition;
} else {
// no partitions are available, give a non-available partition
int numPartitions = cluster.partitionsForTopic(topic).size();
return Utils.toPositive(nextValue) % numPartitions;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
int partition = availablePartitions.get(part).partition();
LOGGER.trace("Partition chosen: {}", partition);
return partition;
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
AtomicInteger counter =
topicCounterMap.computeIfAbsent(
topic,
k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

public void close() {}
private Queue<Integer> partitionQueueComputeIfAbsent(String topic) {
return topicPartitionQueueMap.computeIfAbsent(topic, k -> {
return new ConcurrentLinkedQueue<>();
});
}

public void close() {
}

}
/**
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
*
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new
* batch
*/
@SuppressWarnings("deprecation")
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
LOGGER.trace("New batch so enqueuing partition {} for topic {}", prevPartition, topic);
Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);
partitionQueue.add(prevPartition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testRoundRobinWithUnavailablePartitions() {
int countForPart2 = 0;
Partitioner partitioner = new RoundRobinPartitioner();
Cluster cluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), partitions,
Collections.emptySet(), Collections.emptySet());
Collections.<String>emptySet(), Collections.<String>emptySet());
for (int i = 1; i <= 100; i++) {
int part = partitioner.partition("test", null, null, null, null, cluster);
assertTrue(part == 0 || part == 2, "We should never choose a leader-less node in round robin");
Expand All @@ -66,15 +66,15 @@ public void testRoundRobinWithUnavailablePartitions() {
}

@Test
public void testRoundRobinWithKeyBytes() {
public void testRoundRobinWithKeyBytes() throws InterruptedException {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());
Collections.<String>emptySet(), Collections.<String>emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Expand All @@ -96,17 +96,17 @@ public void testRoundRobinWithKeyBytes() {
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}

@Test
public void testRoundRobinWithNullKeyBytes() {
public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());
Collections.<String>emptySet(), Collections.<String>emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Expand All @@ -126,5 +126,26 @@ public void testRoundRobinWithNullKeyBytes() {
assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}
}
}

@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithAbortForNewBatch() throws Exception {
final String topicA = "topicA";
final String topicB = "topicB";

Cluster testCluster = new Cluster("clusterId", asList(NODES[0]), Collections.emptyList(),
Collections.<String>emptySet(), Collections.<String>emptySet());

Partitioner partitioner = new RoundRobinPartitioner();

//abort for new batch - previous partition should be returned on subsequent call
//simulate three threads producing to two topics, with race condition in producer
partitioner.onNewBatch(topicA, testCluster, 7);
partitioner.onNewBatch(topicA, testCluster, 8);
partitioner.onNewBatch(topicB, testCluster, 1);
assertEquals(7, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(8, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster));
}
}