Skip to content

Commit dc602b7

Browse files
authored
Fix deserialization of kafka producer json config in the kafka-reporter-plugin. (apache#542)
1 parent c04a714 commit dc602b7

File tree

3 files changed

+31
-16
lines changed

3 files changed

+31
-16
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Release Notes.
1919
* Fix possible IllegalStateException when using Micrometer.
2020
* Support Grizzly Work ThreadPool Metric Monitor
2121
* Fix the gson dependency in the kafka-reporter-plugin.
22+
* Fix deserialization of kafka producer json config in the kafka-reporter-plugin.
2223
* Support to config custom decode methods for kafka configurations
2324

2425
#### Documentation

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.skywalking.apm.agent.core.kafka;
2020

21+
import com.google.gson.reflect.TypeToken;
2122
import com.google.gson.Gson;
22-
2323
import java.lang.reflect.InvocationTargetException;
2424
import java.util.ArrayList;
2525
import java.util.HashSet;
@@ -34,7 +34,6 @@
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.TimeoutException;
3636
import java.util.stream.Collectors;
37-
3837
import org.apache.kafka.clients.admin.AdminClient;
3938
import org.apache.kafka.clients.admin.DescribeTopicsResult;
4039
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -107,11 +106,7 @@ public void run() {
107106
Properties properties = new Properties();
108107
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);
109108

110-
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
111-
Gson gson = new Gson();
112-
Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
113-
decode(config).forEach(properties::setProperty);
114-
}
109+
setPropertiesFromJsonConfig(properties);
115110
decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
116111

117112
try (AdminClient adminClient = AdminClient.create(properties)) {
@@ -131,12 +126,12 @@ public void run() {
131126
})
132127
.filter(Objects::nonNull)
133128
.collect(Collectors.toSet());
134-
129+
135130
if (!topics.isEmpty()) {
136131
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
137132
return;
138133
}
139-
134+
140135
try {
141136
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
142137
} catch (Exception e) {
@@ -149,6 +144,15 @@ public void run() {
149144
}
150145
}
151146

147+
void setPropertiesFromJsonConfig(Properties properties) {
148+
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
149+
Gson gson = new Gson();
150+
Map<String, String> config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON,
151+
new TypeToken<Map<String, String>>() { }.getType());
152+
decode(config).forEach(properties::setProperty);
153+
}
154+
}
155+
152156
private void notifyListeners(KafkaConnectionStatus status) {
153157
for (KafkaConnectionStatusListener listener : listeners) {
154158
listener.onStatusChanged(status);

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@
1818

1919
package org.apache.skywalking.apm.agent.core.kafka;
2020

21-
import org.junit.Test;
22-
21+
import static org.junit.Assert.assertEquals;
2322
import java.lang.reflect.Method;
23+
import java.util.Properties;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.Test;
2426
import java.nio.charset.StandardCharsets;
2527
import java.util.Base64;
2628
import java.util.HashMap;
2729
import java.util.Map;
28-
import java.util.concurrent.atomic.AtomicInteger;
29-
30-
import static org.junit.Assert.assertEquals;
3130

3231
public class KafkaProducerManagerTest {
3332
@Test
@@ -39,8 +38,8 @@ public void testAddListener() throws Exception {
3938
kafkaProducerManager.addListener(new MockListener(counter));
4039
}
4140
Method notifyListeners = kafkaProducerManager
42-
.getClass()
43-
.getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
41+
.getClass()
42+
.getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
4443
notifyListeners.setAccessible(true);
4544
notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED);
4645

@@ -60,6 +59,17 @@ public void testFormatTopicNameThenRegister() {
6059
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
6160
}
6261

62+
@Test
63+
public void testSetPropertiesFromJsonConfig() {
64+
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
65+
Properties properties = new Properties();
66+
67+
KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}";
68+
kafkaProducerManager.setPropertiesFromJsonConfig(properties);
69+
70+
assertEquals(properties.get("batch.size"), "32768");
71+
}
72+
6373
@Test
6474
public void testDecode() throws Exception {
6575
KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";

0 commit comments

Comments
 (0)