Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
current main branch
Describe the bug
KafkaTestUtils.java
If received can be null, a NullPointerException may occur in the logger received.count(). (line 371)
But, as far as I know, 'consumer.poll method' does not return null.
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout, int minRecords) {
logger.debug("Polling...");
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
long remaining = timeout;
int count = 0;
do {
long t1 = System.currentTimeMillis();
ConsumerRecords<K, V> received = consumer.poll(Duration.ofMillis(remaining));
logger.debug(() -> "Received: " + received.count() + ", "
+ received.partitions().stream()
.flatMap(p -> received.records(p).stream())
// map to same format as send metadata toString()
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
.collect(Collectors.toList()));
if (received == null) {
throw new IllegalStateException("null received from consumer.poll()");
}
if (minRecords < 0) {
return received;
}
else {
count += received.count();
received.partitions().forEach(tp -> {
List<ConsumerRecord<K, V>> recs = records.computeIfAbsent(tp, part -> new ArrayList<>());
recs.addAll(received.records(tp));
});
remaining -= System.currentTimeMillis() - t1;
}
}
while (count < minRecords && remaining > 0);
return new ConsumerRecords<>(records);
}
How about moving it right below line 370 or removing it?
if (received == null) {
throw new IllegalStateException("null received from consumer.poll()");
}
I appreciate you taking the time to read this
To Reproduce
Steps to reproduce the behavior.
Expected behavior
A clear and concise description of what you expected to happen.
Sample
A link to a GitHub repository with a minimal, reproducible, sample.
Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.