Skip to content

baskaranz/kafka-protobuf-serde

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 

Repository files navigation

kafka-protobuf-serde

Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages

Requirements

Dependency Version
Kafka 1.X.X
Protobuf 3.X.X
Java 7+

Usage

Add the following to your Maven dependency list:

<dependency>
    <groupId>com.github.daniel-shuy</groupId>
    <artifactId>kafka-protobuf-serde</artifactId>
    <version>1.0.0</version>
</dependency>

Override the kafka-client dependency version with the version of Kafka you wish to use:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

Override the protobuf.version property with the version of Protobuf you wish to use (WARNING: do not directly override the protobuf-java dependency version):

<properties>
    <protobuf.version>3.6.1</protobuf.version>
</properties>

Kafka Producer

Properties props = new Properties();
// props.put(..., ...);

Producer<String, MyValue> producer = new KafkaProducer<>(props,
    new StringSerializer(),
    new KafkaProtobufSerializer<>());

producer.send(new ProducerRecord<>("topic", new MyValue()));

Kafka Consumer

Properties props = new Properties();
// props.put(..., ...);

Consumer<String, MyValue> consumer = new KafkaConsumer<>(props,
    new StringDeserializer(),
    new KafkaProtobufDeserializer(MyValue.parser()));

consumer.subscribe("topic");
ConsumerRecords<String, MyValue> records = consumer.poll(100);

records.forEach(record -> {
    String key = record.key();
    MyValue value = record.value();

    // ...
});

Kafka Streams

Serde<String> stringSerde = Serdes.String();
Serde<MyValue> myValueSerde = KafkaProtobufSerde(MyValue.parser());

Properties config = new Properties();
// config.put(..., ...);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MyValue> myValues = builder.stream("input_topic", Consumed.with(stringSerde, myValueSerde));
KStream<String, MyValue> filteredMyValues = myValues.filter((key, value) -> {
    // ...
});
filteredMyValues.to("output_topic", Produced.with(stringSerde, myValueSerde));

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
streams.setUncaughtExceptionHandler((thread, throwable) -> {
    // ...
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();

Spring for Apache Kafka (spring-kafka)

Kafka Producer

@Configuration
public class KafkaConfig {
    @Bean
    public ProducerFactory<String, MyValue> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        // props.put(..., ...);

        return new DefaultKafkaProducerFactory<>(producerProps,
                new StringSerializer(),
                new KafkaProtobufSerializer<>());
    }

    @Bean
    public KafkaTemplate<String, MyValue> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

Kafka Consumer

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyValue>>
            kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, MyValue> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        // props.put(..., ...);

        return new DefaultKafkaConsumerFactory<>(props, 
            new StringDeserializer(), 
            new KafkaProtobufDeserializer(MyValue.parser()));
    }
}

public class Listener {
    @KafkaListener(id = "foo", topics = "annotated1")
    public void listen1(String foo) {
        // ...
    }
}

About

Serializer/Deserializer for Kafka to serialize/deserialize Protocol Buffers messages

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 100.0%