Table of Contents
- What we want to do
- Terminology
- Requirements
- Start a containerized Kafka cluster
- Run Confluent demo applications for Kafka Streams API
- Stop the containerized Kafka cluster
- Where to go from here
- Appendix
The Apache Kafka project includes the Kafka Streams API, which is a Java library for building applications in Java/Scala/Clojure/... that process and analyze data stored in Kafka. The cool part about the Kafka Streams API is that it makes your applications highly scalable, elastic, stateful, and fault-tolerant -- and all this without requiring any processing cluster. This document helps you to more quickly and more conveniently develop Kafka Streams applications on your laptop, build server, etc. by running and integrating with a containerized Kafka cluster.
With the code and instructions in this repository, we will:
- Start a containerized, 3-node Kafka cluster on your host machine, e.g. your Mac laptop, using Docker Compose and Confluent's Docker images for Confluent Platform; more precisely, we use Confluent Open Source version 3.2.0 with Apache Kafka 0.10.2.0, i.e. the latest versions as of March 2017.
- Build and package the Confluent demo applications for the Kafka Streams API on your host machine.
- Run one of the demo applications (here: WordCount) on your host machine against the containerized Kafka cluster.
- You can interactively enter the input data for the WordCount application.
"Why should I do this?" A local development setup such as the above is very useful when doing iterative development, when you don't have access to a Kafka cluster from your laptop, when you would like to experiment with failure scenarios, and for many more reasons. Did we already mention that it's also a heck of a lot of fun?
"How much time do I need?" The expected time to complete this walkthrough is 10 minutes, excluding the time to install prerequisites such as Docker.
- The host machine is the machine that is running Docker; for example, your Mac laptop.
- On Mac OS and Windows OS, the Docker Machine is the VM in which your Docker containers run.
Your host machine must have the following software installed:
-
Docker 17.03.1-ce (2017-03-27) or later.
$ docker --version Docker version 17.03.1-ce, build c6d412e $ docker-compose --version docker-compose version 1.11.2, build dfed245 # Mac and Windows users only $ docker-machine --version docker-machine version 0.10.0, build 76ed2a6
-
The following software is required to build and run the Confluent demo applications for the Kafka Streams API. There's no container provided for this part because, arguably, most users are developing Kafka Streams applications directly on their host machines, e.g. via an IDE on their Mac laptops.
- git
- Maven 3
- Java JDK 8+
Here, we essentially follow the Confluent Docker Quickstart in the Confluent documentation. If you run into problems, Confluent's Docker Quickstart has troubleshooting tips available.
If you just want to sit back and see what we will be doing in the subsequent sections, take a look at the following recording:
Screencast: Start a containerized Kafka cluster, using Confluent's Docker images.
Clone this repository:
# Clone this repository to `$HOME/kafka-streams-docker` aka `~/kafka-streams-docker`.
$ git clone https://github.com/miguno/kafka-streams-docker.git ~/kafka-streams-docker
Create a VM with 6GB of memory as our Docker Machine:
# Create a VirtualBox VM with ~6GB of memory to serve as our Docker Machine.
$ docker-machine create --driver virtualbox --virtualbox-memory 6000 confluent
Run docker-machine ls
to verify that the Docker Machine is running correctly.
The command's output should be similar to:
$ docker-machine ls
NAME ACTIVE DRIVER STATE URL SWARM DOCKER ERRORS
confluent * virtualbox Running tcp://192.168.99.100:2376 v17.03.1-ce
Now configure your terminal to attach it to the new Docker Machine named confluent
:
$ eval $(docker-machine env confluent)
Important: Whenever you run Docker commands from a terminal window, then this terminal must be attached to the Docker Machine first via
eval $(docker-machine env confluent)
. Keep this in mind when opening new terminal windows!
Next, we start a containerized Kafka cluster (3 brokers) with a ZooKeeper ensemble (1 node) in the background.
# Change into the `kafka-streams-docker` directory from step 1,
# which is where `docker-compose.yml` resides.
$ cd ~/kafka-streams-docker
Additional command for Mac and Windows users: Make the Docker Machine's IP address available via the
DOCKER_MACHINE_IP
environment variable, which is used by docker-compose.yml.$ export DOCKER_MACHINE_IP=`docker-machine ip confluent`
# Start the cluster
$ docker-compose up -d
If you want to, you can verify the health of the Kafka cluster that you just deployed.
At this point, the Kafka cluster is up and running. To recap, we have now available to us (cf. docker-compose.yml):
Service | Container name | Endpoint on Mac/Windows hosts | Endpoint on Linux hosts |
---|---|---|---|
Kafka broker (id 1) | confluent-kafka-1 |
$DOCKER_MACHINE_IP:29092 |
localhost:29092 |
Kafka broker (id 2) | confluent-kafka-2 |
$DOCKER_MACHINE_IP:39092 |
localhost:39092 |
Kafka broker (id 3) | confluent-kafka-3 |
$DOCKER_MACHINE_IP:49092 |
localhost:49092 |
ZooKeeper node | confluent-zookeeper |
$DOCKER_MACHINE_IP:32181 |
localhost:32181 |
Note: The Kafka brokers and the ZooKeeper node are accessible from other containers via the localhost:PORT
setting
in the column "Endpoint on Linux hosts" above.
If you just want to sit back and see what we will be doing in the subsequent sections, take a look at the following recording:
Screencast: Run the WordCount demo application against the containerized Kafka cluster.
Clone the repository that contains the Confluent demo applications:
# Clone the examples repository to `$HOME/examples` aka `~/examples`.
$ git clone https://github.com/confluentinc/examples.git ~/examples
Next, we must build and package the examples into a so-called "fat" jar:
# Change into the directory that contains the examples for the Kafka Streams API
$ cd ~/examples/kafka-streams
# We want to use examples that work with Confluent 3.2.x
$ git checkout 3.2.x
# Build and package the examples.
$ mvn -D skipTests=true clean package
>>> Creates ~/examples/kafka-streams/target/streams-examples-3.2.0-standalone.jar
Now we can run any of the Kafka Streams API examples. Each example such as
the WordCountLambdaExample
ships with instructions how to use it. The only parts in the instructions of an example that we need to modify are
where to find the Kafka brokers aka Kafka's bootstrap.servers
parameter because the instructions in the examples
assume localhost:9092
by default:
- Mac and Windows users:
bootstrap.servers
is$DOCKER_MACHINE_IP:29092
(e.g.192.168.99.100:29092
) - Linux users:
bootstrap.servers
islocalhost:29092
Tip: You can also specify multiple brokers as
bootstrap.servers
. Mac and Windows users, for example, could also setbootstrap.servers
to$DOCKER_MACHINE_IP:29092,$DOCKER_MACHINE_IP:39092,$DOCKER_MACHINE_IP:49092
.
All examples allow us to override the bootstrap.servers
parameter via a CLI argument, and for most examples you do
so by providing the bootstrap.servers
parameter as the first CLI argument (for examples demonstrating Kafka's
Interactive Queries feature, it is
the second CLI argument).
Let's test-drive the aforementioned WordCountLambdaExample application. The "steps" below refer to the steps in the example's instructions.
Step 1: We can skip step 1 in the example's instructions because Kafka and ZooKeeper are already running (see above).
Step 2: Create the input and output topics used by the WordCount application.
# If you haven't done so, change into the `kafka-streams-docker` directory
# from step 1, which is where `docker-compose.yml` resides.
$ cd ~/kafka-streams-docker
# Create the application's input topic "TextLinesTopic".
$ docker-compose exec confluent-kafka-1 kafka-topics \
--create --topic TextLinesTopic \
--zookeeper localhost:32181 --partitions 1 --replication-factor 3
# Create the application's output topic "WordsWithCountsTopic".
$ docker-compose exec confluent-kafka-1 kafka-topics \
--create --topic WordsWithCountsTopic \
--zookeeper localhost:32181 --partitions 1 --replication-factor 3
Tip: If you have Confluent Open Source installed locally on your host machine, then you can also run the Kafka CLI commands such as
kafka-topics
andkafka-console-producer
in this section directly from your host machine, rather than indirectly viadocker-compose exec ...
from inside theconfluent-kafka-1
container. For example, with Confluent Open Source available locally, the firstdocker-compose exec
command above that executeskafka-topics
could also be run directly on the host machine as:$ /path/to/confluent-3.2.0/bin/kafka-topics \ --create --topic TextLinesTopic \ --zookeeper localhost:32181 --partitions 1 --replication-factor 3
This direct approach is particularly helpful if, for example, you want to ingest some local data into Kafka during iterative development or for testing and debugging.
You can verify that topics were created successfully with kafka-topics --list
or kafka-topics --describe
:
$ docker-compose exec confluent-kafka-1 kafka-topics --describe --topic TextLinesTopic --zookeeper localhost:32181
Topic:TextLinesTopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: TextLinesTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Step 3: Start the WordCount application either in your IDE or on the command line. In this document, we use the command line.
Mac and Windows users:
# Start the WordCount application
$ export DOCKER_MACHINE_IP=`docker-machine ip confluent`
$ java -cp ~/examples/kafka-streams/target/streams-examples-3.2.0-standalone.jar \
io.confluent.examples.streams.WordCountLambdaExample $DOCKER_MACHINE_IP:29092
Linux users:
# Start the WordCount application
$ java -cp ~/examples/kafka-streams/target/streams-examples-3.2.0-standalone.jar \
io.confluent.examples.streams.WordCountLambdaExample localhost:29092
The application will continue to run in the terminal until you stop it via Ctrl-C
-- but don't stop it just yet
because we are not done yet with this example!
Step 4: Write some input data to the source topic "TextLinesTopic", e.g. via kafka-console-producer
.
The already running WordCount application (step 3) will automatically process this input data
and write the results to the output topic "WordsWithCountsTopic".
# Tip: Use a new terminal for the following commands!
# (Mac and Windows users: ensure the new terminal is attached to Docker Machine, see above)
# Start the console producer.
$ docker-compose exec confluent-kafka-1 kafka-console-producer \
--broker-list localhost:29092 \
--topic TextLinesTopic
The console producer will start up and wait for your input (unfortunately it does not show a proper prompt to indicate this). Any text lines you enter now will be turned into Kafka messages and sent to the input topic "TextLinesTopic". Let's enter some input data:
hello kafka streams<ENTER>
all streams lead to kafka<ENTER>
join kafka summit<ENTER>
This will send 3 messages to the input topic: message keys are null
, and message values are the textlines,
e.g. "hello kafka streams". If you want to, you can terminate the console producer now via Ctrl-C
.
Alternatively, you can keep it running and enter more input data later.
Step 5: Inspect the resulting data in the output topic, e.g. via kafka-console-consumer
.
# Tip: Use a new terminal for the following commands!
# (Mac and Windows users: ensure the new terminal is attached to Docker Machine, see above)
# Start the console consumer.
$ docker-compose exec confluent-kafka-1 kafka-console-consumer \
--bootstrap-server localhost:29092 --new-consumer \
--topic WordsWithCountsTopic --from-beginning \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
You should see output data similar to below.
hello 1
kafka 1
streams 1
all 1
streams 2
lead 1
to 1
join 1
kafka 3
summit 1
Note: The exact output sequence will depend on how fast you type the above sentences. If you type them slowly, you are likely to get each count update, e.g., "kafka 1", "kafka 2", "kafka 3". If you type them quickly, you are likely to get fewer count updates, e.g., just "kafka 3". This is because the commit interval is set to 10 seconds, and anything typed within that interval will be "compacted" in memory (cf. record caches in the DSL).
You can stop the console consumer at any time with Ctrl-C
. Alternatively, you can keep it running and
enter more input data via the console producer that runs in your other terminal.
Once you are done with your experiments, you can stop the WordCount application via Ctrl-C
as well as
the console producer/consumer.
If by now you want to write your own Kafka Streams application, head over to the Confluent documentation and read the chapter on the Kafka Streams API.
To shutdown the Kafka cluster:
# Stop and remove containers.
# Careful: THIS STEP WILL RESULT IN LOSING ALL THE DATA THAT IS STORED IN KAFKA AND ZOOKEEPER.
$ docker-compose down
Tip: If you want to preserve the containers including any of their local data such as Kafka topics, you must use
docker-compose stop
(rather thandown
), and subsequentlydocker-compose start
(rather thanup
) to re-start the same cluster again.
Mac and Windows users only: If you also want to shutdown the Docker Machine:
# Option 1: Gracefully stop it but don't throw it away.
# Re-use by restarting with `docker-machine start confluent`.
$ docker-machine stop confluent
# Option 2: Stop it and throw it away.
# Start from scratch via `docker-machine create ...` (see above).
$ docker-machine rm -f confluent
Hopefully you enjoyed this quick walkthrough!
As next steps, you may want to:
- Download Confluent Open Source
- Want to write your own Kafka Streams application now? Read the Confluent documentation for the Kafka Streams API to get started!
- Further information on using Docker with Confluent
- Browse through further Confluent demo applications for the Kafka Streams API
- Join our Confluent Community Slack at https://confluentcommunity.slack.com/, notably the
#streams
channel (you need to register a free account first)
Verify that the containers are running:
$ docker-compose ps
# You should see the following:
Name Command State Ports
-------------------------------------------------------------------------------
streamsdocker_confluent-kafka-1_1 /etc/confluent/docker/run Up
streamsdocker_confluent-kafka-2_1 /etc/confluent/docker/run Up
streamsdocker_confluent-kafka-3_1 /etc/confluent/docker/run Up
streamsdocker_confluent-zookeeper_1 /etc/confluent/docker/run Up
Verify that the ZooKeeper node is healthy:
$ docker-compose logs confluent-zookeeper | grep -i "binding to port"
# You should see a line similar to:
confluent-zookeeper_1 | [2017-04-03 19:26:47,764] INFO binding to port 0.0.0.0/0.0.0.0:32181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
You can also use Zookeeper's Four Letter Words to
perform additional checks (here: stat
):
$ docker-compose exec confluent-kafka-1 bash -c "echo stat | nc localhost 32181"
# You should see a line similar to:
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
/127.0.0.1:53908[1](queued=0,recved=42,sent=42)
/127.0.0.1:53906[1](queued=0,recved=78,sent=80)
/127.0.0.1:53920[0](queued=0,recved=1,sent=0)
/127.0.0.1:53904[1](queued=0,recved=55,sent=55)
Latency min/avg/max: 0/2/27
Received: 184
Sent: 185
Connections: 4
Outstanding: 0
Zxid: 0x37
Mode: standalone
Node count: 23
Verify that the first Kafka broker (with broker.id == 1
) is healthy:
$ docker-compose logs confluent-kafka-1 | grep -i "started (kafka.server.KafkaServer)"
# You should see a line similar to:
confluent-kafka-1_1 | [2017-04-03 19:45:18,476] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
You can similarly verify the other Kafka brokers.
IMPORTANT:
docker-compose
commands must be run from the directory in whichdocker-compose.yml
resides.
Show running containers:
$ docker-compose ps
$ docker ps
# Example:
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------------
streamsdocker_confluent-kafka-1_1 /etc/confluent/docker/run Up
streamsdocker_confluent-kafka-2_1 /etc/confluent/docker/run Up
streamsdocker_confluent-kafka-3_1 /etc/confluent/docker/run Up
streamsdocker_confluent-zookeeper_1 /etc/confluent/docker/run Up
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1c88e5d3c24b confluentinc/cp-kafka:3.2.0 "/etc/confluent/do..." 9 minutes ago Up 9 minutes streamsdocker_confluent-kafka-2_1
ed549872edfb confluentinc/cp-kafka:3.2.0 "/etc/confluent/do..." 9 minutes ago Up 9 minutes streamsdocker_confluent-kafka-3_1
e6e914c12c41 confluentinc/cp-kafka:3.2.0 "/etc/confluent/do..." 9 minutes ago Up 9 minutes streamsdocker_confluent-kafka-1_1
51a481408420 confluentinc/cp-zookeeper:3.2.0 "/etc/confluent/do..." 9 minutes ago Up 9 minutes streamsdocker_confluent-zookeeper_1
Show ALL containers, including those that are not running:
$ docker-compose ps -a
$ docker ps -a
Log into the Docker Machine named "confluent":
$ docker-machine ssh confluent
Log into a running container by opening a shell:
$ docker-compose exec <container id or name> /bin/bash
$ docker exec -ti <container id or name> /bin/bash
# Example:
$ docker-compose exec confluent-kafka-1 /bin/bash
Show the logs of a running container:
# Print logs and exit
$ docker-compose logs <container id or name>
$ docker logs <container id or name>
# Print logs continuously until you stop the command via `Ctrl-C` (think: `tail -f`)
$ docker-compose logs -f <container id or name>
$ docker logs -f <container id or name>
# Example:
$ docker-compose logs -f confluent-kafka-1