Writing to Kafka from Apache Beam
This recipe shows how to write to Kafka with Apache Beam.
Getting ready
To install Apache Beam, follow the instructions at: https://beam.apache.org/get-started/quickstart-py/.
How to do it...
The following code shows how to write a Beam pipeline to write to Kafka. The example illustrates various options for configuring the Beam sink:
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092, broker_2:9092")
.withTopic("destination-topic")
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.updateProducerProperties(
ImmutableMap.of("compression.type", "gzip"))
); How it works...
The KafkaIO sink supports writing key value pairs to a Kafka topic. To configure a Kafka sink, specify the Kafka bootstrap servers and the topic to write to.
The KafakIO allows setting most of the properties in the consumer configuration...