Skip to content

GH-3067: Spring Kafka support multiple headers with same key. #3874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressing PR review
Signed-off-by: chickenchickenlove <[email protected]>
  • Loading branch information
chickenchickenlove committed May 3, 2025
commit 065bedc3ee6ca1501b2f0a6ef999c80ad26df886
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {

private final List<HeaderMatcher> matchers = new ArrayList<>();

private final List<HeaderMatcher> headerMatchersForMultiValue = new ArrayList<>();

private final Map<String, Boolean> rawMappedHeaders = new HashMap<>();

{
Expand Down Expand Up @@ -191,6 +193,16 @@ public void addRawMappedHeader(String name, boolean toString) {
this.rawMappedHeaders.put(name, toString);
}

/**
* Add patterns for matching multi-value headers under the same key.
* @param patterns the patterns for header.
*/
public void addHeaderPatternsForMultiValue(String ... patterns) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct name from Java Beans specification perspective.
Really has to be setMultiValueHeaderPatterns.

Not sure why you use for over there.

for (String pattern : patterns) {
this.headerMatchersForMultiValue.add(new SimplePatternBasedHeaderMatcher(pattern));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can use Java Stream API and then addAll on the targer collection.

}
}

protected boolean matches(String header, Object value) {
if (matches(header)) {
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL))
Expand Down Expand Up @@ -251,6 +263,20 @@ protected Object headerValueToAddOut(String key, Object value) {
return valueToAdd;
}

/**
* Check whether the header value should be mapped to multiple values.
* @param headerName the header name.
* @return True for multiple values at the same key.
*/
protected boolean doesMatchMultiValueHeader(String headerName) {
for (HeaderMatcher headerMatcher : this.headerMatchersForMultiValue) {
if (headerMatcher.matchHeader(headerName)) {
return true;
}
}
return false;
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Nullable
private byte[] mapRawOut(String header, Object value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -338,9 +339,22 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
* @param headers the target headers.
* @since 4.0.0
*/

protected void handleHeader(String headerName, Header header, final Map<String, Object> headers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mapping from Kafka headers, So, I believe this name is better in this context: fromUserHeader.
Since standard headers have been already mapped before.

headers.put(headerName, headerValueToAddIn(header));
if (!this.doesMatchMultiValueHeader(headerName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this. prefix for method calls.

headers.put(headerName, headerValueToAddIn(header));
}
else {
Object values = headers.getOrDefault(headerName, new ArrayList<>());
if (values instanceof List) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully sure in this logic.
Right now you mean: if there is a header with requested name and it is not a list, then we override its value.
Is it really expected?
Why we just cannot always override withe list since this is exactly what is asked by the respective pattern?
Otherwise I feel like we cannot override since the value is already there.

@SuppressWarnings("unchecked")
List<Object> castedValues = (List<Object>) values;
castedValues.add(headerValueToAddIn(header));
headers.put(headerName, castedValues);
}
else {
headers.put(headerName, headerValueToAddIn(header));
}
}
}

private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,9 @@
package org.springframework.kafka.support;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -35,6 +37,7 @@
* The exceptions are correlation and reply headers for request/reply
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.1.3
*
*/
Expand Down Expand Up @@ -111,7 +114,21 @@ public void toHeaders(Headers source, Map<String, Object> target) {
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(headerName, headerValueToAddIn(header));
if (!this.doesMatchMultiValueHeader(headerName)) {
target.put(headerName, headerValueToAddIn(header));
}
else {
Object values = target.getOrDefault(headerName, new ArrayList<>());
if (values instanceof List) {
@SuppressWarnings("unchecked")
List<Object> castedValues = (List<Object>) values;
castedValues.add(headerValueToAddIn(header));
target.put(headerName, castedValues);
}
else {
target.put(headerName, headerValueToAddIn(header));
}
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
* @author Gary Russell
* @author Dariusz Szablinski
* @author Biju Kunjummen
* @author Sanghyeok An
*/
public class MessagingMessageConverter implements RecordMessageConverter {

Expand All @@ -84,15 +83,6 @@ public MessagingMessageConverter() {
this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class));
}

/**
* Construct an instance that uses given HeaderMapper.
* @param headerMapper the Header mapper.
* @since 4.0.0
*/
public MessagingMessageConverter(KafkaHeaderMapper headerMapper) {
this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class), headerMapper);
}

/**
* Construct an instance that uses the supplied partition provider function. The
* function can return null to delegate the partition selection to the kafka client.
Expand All @@ -110,18 +100,6 @@ public MessagingMessageConverter(Function<Message<?>, @Nullable Integer> partiti
this.partitionProvider = partitionProvider;
}

/**
* Construct an instance that uses the supplied partition provider function and given HeaderMapper.
* @param partitionProvider the provider.
* @param headerMapper the Header mapper.
* @since 4.0.0
*/
public MessagingMessageConverter(Function<Message<?>, @Nullable Integer> partitionProvider, KafkaHeaderMapper headerMapper) {
Assert.notNull(partitionProvider, "'partitionProvider' cannot be null");
this.headerMapper = headerMapper;
this.partitionProvider = partitionProvider;
}

/**
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
* will try to use a default value. By default set to {@code false}.
Expand Down
Loading