Skip to content

Commit b0e3796

Browse files
Adrian Coleadriancole
authored andcommitted
Makes Kafka fail in healthCheck vs crash the process
Crashing the process sounds good, but it doesn't allow the healthcheck to report the root cause. This makes the failure work like the previous Kafka 0.8 collector.
1 parent e53e3c7 commit b0e3796

File tree

5 files changed

+51
-42
lines changed

5 files changed

+51
-42
lines changed

zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.atomic.AtomicReference;
23-
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.common.KafkaException;
24+
import org.apache.kafka.common.config.ConfigException;
2425
import org.apache.kafka.common.errors.InterruptException;
2526
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
2627
import org.slf4j.Logger;
@@ -36,6 +37,8 @@
3637
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
3738
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
3839
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
40+
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
41+
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
3942

4043
/**
4144
* This collector polls a Kafka topic for messages that contain TBinaryProtocol big-endian encoded
@@ -139,10 +142,8 @@ public KafkaCollector build() {
139142
// https://kafka.apache.org/documentation/#newconsumerconfigs
140143
properties.put(GROUP_ID_CONFIG, "zipkin");
141144
properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
142-
properties.put(
143-
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
144-
properties.put(
145-
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
145+
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
146+
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
146147
}
147148
}
148149

@@ -215,7 +216,7 @@ ExecutorService compute() {
215216
: Executors.newFixedThreadPool(streams);
216217

217218
for (int i = 0; i < streams; i++) {
218-
final KafkaCollectorWorker worker = new KafkaCollectorWorker(builder);
219+
KafkaCollectorWorker worker = new KafkaCollectorWorker(builder);
219220
workers.add(worker);
220221
pool.execute(guardFailures(worker));
221222
}
@@ -224,20 +225,21 @@ ExecutorService compute() {
224225
}
225226

226227
Runnable guardFailures(final Runnable delegate) {
227-
return new Runnable() {
228-
@Override
229-
public void run() {
230-
try {
231-
delegate.run();
232-
} catch (InterruptException e) {
233-
// Interrupts are normal on shutdown, intentionally swallow
234-
} catch (RuntimeException e) {
235-
LOG.error("Kafka worker exited with exception", e);
236-
failure.set(CheckResult.failed(e));
237-
} catch (Error e) {
238-
LOG.error("Kafka worker exited with error", e);
239-
failure.set(CheckResult.failed(new RuntimeException(e)));
240-
}
228+
return () -> {
229+
try {
230+
delegate.run();
231+
} catch (InterruptException e) {
232+
// Interrupts are normal on shutdown, intentionally swallow
233+
} catch (KafkaException e) {
234+
if (e.getCause() instanceof ConfigException) e = (KafkaException) e.getCause();
235+
LOG.error("Kafka worker exited with exception", e);
236+
failure.set(CheckResult.failed(e));
237+
} catch (RuntimeException e) {
238+
LOG.error("Kafka worker exited with exception", e);
239+
failure.set(CheckResult.failed(e));
240+
} catch (Error e) {
241+
LOG.error("Kafka worker exited with error", e);
242+
failure.set(CheckResult.failed(new RuntimeException(e)));
241243
}
242244
};
243245
}

zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import java.util.Collection;
1919
import java.util.Collections;
2020
import java.util.List;
21+
import java.util.Properties;
2122
import java.util.concurrent.atomic.AtomicReference;
22-
import org.apache.kafka.clients.consumer.Consumer;
2323
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2424
import org.apache.kafka.clients.consumer.ConsumerRecord;
2525
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -46,17 +46,25 @@ public void onSuccess(Void value) {}
4646
public void onError(Throwable t) {}
4747
};
4848

49-
final Consumer<byte[], byte[]> kafkaConsumer;
49+
final Properties properties;
50+
final List<String> topics;
5051
final Collector collector;
5152
final CollectorMetrics metrics;
5253
/** Kafka topic partitions currently assigned to this worker. List is not modifiable. */
5354
final AtomicReference<List<TopicPartition>> assignedPartitions =
5455
new AtomicReference<>(Collections.emptyList());
5556

5657
KafkaCollectorWorker(KafkaCollector.Builder builder) {
57-
kafkaConsumer = new KafkaConsumer<>(builder.properties);
58-
List<String> topics = Arrays.asList(builder.topic.split(","));
59-
kafkaConsumer.subscribe(
58+
properties = builder.properties;
59+
topics = Arrays.asList(builder.topic.split(","));
60+
collector = builder.delegate.build();
61+
metrics = builder.metrics;
62+
}
63+
64+
@Override
65+
public void run() {
66+
try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties)) {
67+
kafkaConsumer.subscribe(
6068
topics,
6169
new ConsumerRebalanceListener() {
6270
@Override
@@ -69,13 +77,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
6977
assignedPartitions.set(Collections.unmodifiableList(new ArrayList<>(partitions)));
7078
}
7179
});
72-
this.collector = builder.delegate.build();
73-
this.metrics = builder.metrics;
74-
}
75-
76-
@Override
77-
public void run() {
78-
try {
7980
LOG.info("Kafka consumer starting polling loop.");
8081
while (true) {
8182
final ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(1000);
@@ -110,7 +111,6 @@ public void run() {
110111
} finally {
111112
LOG.info("Kafka consumer polling loop stopped.");
112113
LOG.info("Closing Kafka consumer...");
113-
kafkaConsumer.close();
114114
LOG.info("Kafka consumer closed.");
115115
}
116116
}

zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,30 @@ public void teardown() {
8989
}
9090

9191
@Test
92-
public void checkPasses() throws Exception {
92+
public void checkPasses() {
9393
try (KafkaCollector collector = builder("check_passes").build()) {
9494
assertThat(collector.check().ok()).isTrue();
9595
}
9696
}
9797

98+
/**
99+
* Don't raise exception (crash process), rather fail status check! This allows the health check
100+
* to report the cause.
101+
*/
98102
@Test
99-
public void start_failsOnInvalidBootstrapServers() throws Exception {
100-
thrown.expect(KafkaException.class);
101-
thrown.expectMessage("Failed to construct kafka consumer");
103+
public void check_failsOnInvalidBootstrapServers() throws Exception {
102104

103105
KafkaCollector.Builder builder =
104106
builder("fail_invalid_bootstrap_servers").bootstrapServers("1.1.1.1");
105107

106108
try (KafkaCollector collector = builder.build()) {
107109
collector.start();
110+
111+
Thread.sleep(1000L); // wait for crash
112+
113+
assertThat(collector.check().error())
114+
.isInstanceOf(KafkaException.class)
115+
.hasMessage("Invalid url in bootstrap.servers: 1.1.1.1");
108116
}
109117
}
110118

@@ -309,12 +317,11 @@ public void messagesDistributedAcrossMultipleThreadsSuccessfully() throws Except
309317
}
310318

311319
@Test
312-
public void multipleTopicsCommaDelimited() throws Exception {
320+
public void multipleTopicsCommaDelimited() {
313321
try (KafkaCollector collector = builder("topic1,topic2").build()) {
314322
collector.start();
315323

316-
assertThat(collector.kafkaWorkers.workers.get(0).kafkaConsumer.subscription())
317-
.containsExactly("topic1", "topic2");
324+
assertThat(collector.kafkaWorkers.workers.get(0).topics).containsExactly("topic1", "topic2");
318325
}
319326
}
320327

zipkin-server/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,10 @@
176176
<optional>true</optional>
177177
</dependency>
178178

179-
<!-- Kafka Collector (legacy) -->
179+
<!-- Kafka Collector -->
180180
<dependency>
181181
<groupId>io.zipkin.zipkin2</groupId>
182-
<artifactId>zipkin-autoconfigure-collector-kafka08</artifactId>
182+
<artifactId>zipkin-autoconfigure-collector-kafka</artifactId>
183183
<optional>true</optional>
184184
</dependency>
185185

0 commit comments

Comments
 (0)