-
Notifications
You must be signed in to change notification settings - Fork 102
Description
Many thanks for doing the work in this project - as it seems everything is there for solid and scalable operations, but for the entry level I think an end-to-end example WITH security options is missing.
Even the initial setup is quite complex, with many variables. It is always good to kick in a demo knowing then which versions work.
Without security it does not feel real, so at least SASL_PLAINTEXT should be demoable.
- Kafka DockerHub shows 3/3 setup without SASL
- Browsing examples in this repo I did not find such an example
- Packaging done by Confluent and Bitnami feels a little bit over-engineered, did not work for me.
Trying getting a Strimzi example looks more feasible, but still it's a struggle.
I can achieve that the Kafka service spins up, but wiring configurations to validate the JWT token either simply not work (no change) or makes it crash. I think, I have not understood yet how to make OAuth make work for both internal broker communication and talking with the client without mixing these settings up.
Here is the file structure of the test project (need to download 3 Strimzi jar files) - client, server and common.
.
├── client.py
├── docker-compose.yml
├── kafka_broker_jaas.conf
├── keycloak
│ └── realm-kafka.json
├── libs
│ ├── kafka-oauth-common-0.16.2.jar
├── kafka-oauth-client-0.16.2.jar
│ └── kafka-oauth-server-0.16.2.jar
├── log4j2.properties
├── requirements.txt
And here contents of the files.
# docker-compose.yml
---
networks:
kafka-net:
volumes:
kafka1-data:
kafka2-data:
kafka3-data:
services:
# ───── Keycloak ─────
keycloak:
image: quay.io/keycloak/keycloak:26.2.5
command: ["start-dev", "--http-port=8080", "--import-realm"]
environment:
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
KC_HTTP_ENABLED: "true"
KC_HOSTNAME_URL: http://keycloak:8080
KC_HOSTNAME_STRICT: false
volumes:
- ./keycloak/realm-kafka.json:/opt/keycloak/data/import/realm-kafka.json:ro
ports: ["8080:8080"]
networks: [kafka-net]
# ───── Kafka 1 (Broker + Controller) ─────
kafka1:
image: apache/kafka:4.0.0 # natives Apache-Image
container_name: kafka1
hostname: kafka1
networks: [kafka-net]
ports: ["19092:19092"] # OAUTH Listener
volumes:
- kafka1-data:/var/lib/kafka
- ./libs/kafka-oauth-common-0.16.2.jar:/opt/kafka/libs/kafka-oauth-common-0.16.2.jar:ro
- ./libs/kafka-oauth-server-0.16.2.jar:/opt/kafka/libs/kafka-oauth-server-0.16.2.jar:ro
- ./libs/kafka-oauth-client-0.16.2.jar:/opt/kafka/libs/kafka-oauth-client-0.16.2.jar:ro
- ./kafka_broker_jaas.conf:/opt/kafka/config/kafka_broker_jaas.conf:ro
- ./log4j2.properties:/opt/kafka/config/log4j2.properties:ro
environment:
CLUSTER_ID: "ac12e3f4-5678-91ab-cdef-1334567890ab"
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "OAUTH://0.0.0.0:19092,CONTROLLER://0.0.0.0:9093"
KAFKA_ADVERTISED_LISTENERS: "OAUTH://kafka1:19092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "OAUTH:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "OAUTH"
KAFKA_PRINCIPAL_BUILDER_CLASS: "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"
KAFKA_SASL_ENABLED_MECHANISMS: "OAUTHBEARER"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "OAUTHBEARER"
KAFKA_AUTHORIZER_CLASS_NAME: ""
KAFKA_LOG4J_OPTS: "-Dlog4j.configurationFile=/opt/kafka/config/log4j2.properties"
KAFKA_OPTS: |
-Xlog:class+path=info
-Djava.security.auth.login.config=/opt/kafka/config/kafka_broker_jaas.conf
-Dlistener.name.oauth.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.OAuthValidatorCallbackHandler
-Dlistener.name.oauth.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
-Doauth.jwks.endpoint.uri=http://keycloak:8080/realms/kafka/protocol/openid-connect/certs
-Doauth.valid.issuer.uri=http://keycloak:8080/realms/kafka
-Doauth.username.claim=sub
-Dlistener.name.oauth.oauthbearer.token.endpoint.url=http://keycloak:8080/realms/kafka/protocol/openid-connect/token
-Dlistener.name.oauth.oauthbearer.jwks.endpoint.url=http://keycloak:8080/realms/kafka/protocol/openid-connect/certs
-Dlistener.name.oauth.oauthbearer.expected.issuer=http://keycloak:8080/realms/kafka
-Dlistener.name.oauth.oauthbearer.username.claim=sub
# ───── Kafka 2 ─────
kafka2:
image: apache/kafka:4.0.0
container_name: kafka2
hostname: kafka2
networks: [kafka-net]
volumes:
- kafka2-data:/var/lib/kafka
- ./libs/kafka-oauth-common-0.16.2.jar:/opt/kafka/libs/kafka-oauth-common-0.16.2.jar:ro
- ./libs/kafka-oauth-server-0.16.2.jar:/opt/kafka/libs/kafka-oauth-server-0.16.2.jar:ro
- ./libs/kafka-oauth-client-0.16.2.jar:/opt/kafka/libs/kafka-oauth-client-0.16.2.jar:ro
- ./kafka_broker_jaas.conf:/opt/kafka/config/kafka_broker_jaas.conf:ro
environment:
CLUSTER_ID: "ac12e3f4-5678-91ab-cdef-1334567890ab"
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "OAUTH://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093"
KAFKA_ADVERTISED_LISTENERS: "OAUTH://kafka2:29092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "OAUTH:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "OAUTH"
KAFKA_SASL_ENABLED_MECHANISMS: "OAUTHBEARER"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "OAUTHBEARER"
KAFKA_AUTHORIZER_CLASS_NAME: ""
KAFKA_PRINCIPAL_BUILDER_CLASS: "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"
KAFKA_OPTS: |
-Xlog:class+path=info
-Dlog4j.rootLogger=DEBUG
-Djava.security.auth.login.config=/opt/kafka/config/kafka_broker_jaas.conf
-Dlistener.name.oauth.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.OAuthValidatorCallbackHandler
-Dlistener.name.oauth.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
-Doauth.jwks.endpoint.uri=http://keycloak:8080/realms/kafka/protocol/openid-connect/certs
-Doauth.valid.issuer.uri=http://keycloak:8080/realms/kafka
-Doauth.username.claim=sub
-Dlistener.name.oauth.oauthbearer.token.endpoint.url=http://keycloak:8080/realms/kafka/protocol/openid-connect/token
-Dlistener.name.oauth.oauthbearer.jwks.endpoint.url=http://keycloak:8080/realms/kafka/protocol/openid-connect/certs
-Dlistener.name.oauth.oauthbearer.expected.issuer=http://keycloak:8080/realms/kafka
-Dlistener.name.oauth.oauthbearer.username.claim=sub
KAFKA_SUPER_USERS: "User:admin"
ALLOW_PLAINTEXT_LISTENER: "yes"
# ───── Kafka 3 ─────
kafka3:
image: apache/kafka:4.0.0
container_name: kafka3
hostname: kafka3
networks: [kafka-net]
volumes:
- kafka3-data:/var/lib/kafka
- ./libs/kafka-oauth-common-0.16.2.jar:/opt/kafka/libs/kafka-oauth-common-0.16.2.jar:ro
- ./libs/kafka-oauth-server-0.16.2.jar:/opt/kafka/libs/kafka-oauth-server-0.16.2.jar:ro
- ./libs/kafka-oauth-client-0.16.2.jar:/opt/kafka/libs/kafka-oauth-client-0.16.2.jar:ro
- ./kafka_broker_jaas.conf:/opt/kafka/config/kafka_broker_jaas.conf:ro
environment:
CLUSTER_ID: "ac12e3f4-5678-91ab-cdef-1334567890ab"
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "OAUTH://0.0.0.0:39092,CONTROLLER://0.0.0.0:9093"
KAFKA_ADVERTISED_LISTENERS: "OAUTH://kafka3:39092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "OAUTH:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_INTER_BROKER_LISTENER_NAME: "OAUTH"
KAFKA_SASL_ENABLED_MECHANISMS: "OAUTHBEARER"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "OAUTHBEARER"
KAFKA_AUTHORIZER_CLASS_NAME: ""
KAFKA_PRINCIPAL_BUILDER_CLASS: "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder"
KAFKA_OPTS: |
-Xlog:class+path=info
-Dlog4j.rootLogger=DEBUG
-Djava.security.auth.login.config=/opt/kafka/config/kafka_broker_jaas.conf
-Dlistener.name.oauth.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.OAuthValidatorCallbackHandler
-Dlistener.name.oauth.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
-Doauth.jwks.endpoint.uri=http://keycloak:8080/realms/kafka/protocol/openid-connect/certs
-Doauth.valid.issuer.uri=http://keycloak:8080/realms/kafka
-Doauth.username.claim=sub
-Dlistener.name.oauth.oauthbearer.token.endpoint.url=http://keycloak:8080/realms/kafka/protocol/openid-connect/token
-Dlistener.name.oauth.oauthbearer.jwks.endpoint.url=http://keycloak:8080/realms/kafka/protocol/openid-connect/certs
-Dlistener.name.oauth.oauthbearer.expected.issuer=http://keycloak:8080/realms/kafka
-Dlistener.name.oauth.oauthbearer.username.claim=sub
KAFKA_SUPER_USERS: "User:admin"
ALLOW_PLAINTEXT_LISTENER: "yes"
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8081:8080" # UI auf localhost:8081
environment:
# Kafka-Cluster-Basis
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:19092
# Optional: wenn du Schema Registry nutzt
# KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
# Authentifizierung beim Kafka-Broker (falls nötig)
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: OAUTHBEARER
# Keine client secret nötig, da publicClient: true
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
oauth.client.id="kafka-ui"
oauth.token.endpoint.uri="http://keycloak:8080/realms/kafka/protocol/openid-connect/token";
# === Kafka UI selbst: Login via Keycloak ===
AUTH_TYPE: OAUTH2
OAUTH2_CLIENT_ID: kafka-ui
OAUTH2_CLIENT_SECRET: "" # leer für publicClient
OAUTH2_AUTHORIZE_URL: http://keycloak:8080/realms/kafka/protocol/openid-connect/auth
OAUTH2_TOKEN_URL: http://keycloak:8080/realms/kafka/protocol/openid-connect/token
OAUTH2_USER_INFO_URL: http://keycloak:8080/realms/kafka/protocol/openid-connect/userinfo
OAUTH2_REDIRECT_URI: http://localhost:8081/login/oauth2/code/
OAUTH2_SCOPES: openid
OAUTH2_USERNAME_CLAIM: preferred_username
depends_on:
- kafka1
- keycloak
networks:
- kafka-net
Then further files:
# JAAS configuration "stanza"
KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
oauth.client.id="kafka-broker"
oauth.client.secret="kafka-broker-secret"
oauth.token.endpoint.uri="http://keycloak:8080/realms/kafka/protocol/openid-connect/token"
principalClaimName="sub";
};
# logging
status = WARN
name = KafkaLog4j2Config
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss.SSS} %-5level %c{1} - %msg%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT
logger.strimzi.name = io.strimzi
logger.strimzi.level = trace
logger.kafka.name = org.apache.kafka
logger.kafka.level = debug
logger.nimbus.name = com.nimbusds # JWT parsing
logger.nimbus.level = trace
# Keycloak realm
{
"realm": "kafka",
"displayName": "Kafka Realm",
"enabled": true,
"clients": [
{
"clientId": "myapi",
"name": "someapi",
"protocol": "openid-connect",
"enabled": true,
"publicClient": false,
"secret": "12345",
"serviceAccountsEnabled": true,
"directAccessGrantsEnabled": false,
"standardFlowEnabled": false,
"bearerOnly": false,
"attributes": {
"access.token.lifespan": "600"
}
},
{
"clientId": "kafka-broker",
"name": "Kafka Broker (service-account)",
"protocol": "openid-connect",
"publicClient": false,
"secret": "kafka-broker-secret",
"serviceAccountsEnabled": true,
"directAccessGrantsEnabled": false,
"standardFlowEnabled": false,
"bearerOnly": false,
"attributes": {
"access.token.lifespan": "600"
}
},
{
"clientId": "kafka-ui",
"name": "Kafka UI (demo)",
"protocol": "openid-connect",
"publicClient": true,
"redirectUris": [ "http://localhost:8080/*" ],
"standardFlowEnabled": true,
"directAccessGrantsEnabled": true
}
],
"roles": {
"realm": [
{ "name": "kafka-user", "description": "read / write via OAuth" }
]
},
"users": [
{
"username": "alice",
"email": "[email protected]",
"enabled": true,
"credentials": [
{ "type": "password", "value": "alice-secret", "temporary": false }
],
"realmRoles": [ "kafka-user" ]
},
{
"username": "bob",
"email": "[email protected]",
"enabled": true,
"credentials": [
{ "type": "password", "value": "bob-secret", "temporary": false }
],
"realmRoles": [ "kafka-user" ]
}
]
}
And a test client
#!/usr/bin/env python3
"""
Verbose Kafka OAuth client (hard-fail on any error and print JWT token).
Run with Python ≥ 3.10. Requires confluent-kafka, requests, urllib3.
Example:
python kafka_client_verbose.py --topic test --iterations 3
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
import time
from typing import Any, Tuple
import atexit
import requests
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import jwt
import base64
import json
from typing import Any
def decode_jwt_payload(token: str) -> str:
"""
Extracts and decodes the payload segment of a JWT into a JSON string.
:param token: a JWT in the form header.payload.signature
:return: the UTF-8–decoded JSON payload as a string
"""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Token must have exactly two dots (header.payload.signature)")
payload_b64 = parts[1]
# Add padding if necessary (base64 urlsafe requires padding to multiple of 4)
padding = '=' * (-len(payload_b64) % 4)
payload_bytes = base64.urlsafe_b64decode(payload_b64 + padding)
# Return the raw JSON payload as string
return payload_bytes.decode("utf-8")
# ---------------------------------------------------------------------------
# 0. CLI arguments
# ---------------------------------------------------------------------------
parser = argparse.ArgumentParser(description="Verbose Kafka OAuth test client")
parser.add_argument("--topic", default="test", help="Topic to use")
parser.add_argument("--bootstrap", default="localhost:19092", help="Kafka bootstrap servers")
parser.add_argument("--iterations", type=int, default=1, help="How many test messages to produce/consume")
parser.add_argument("--group", default="demo-group", help="Consumer group.id")
args = parser.parse_args()
# ---------------------------------------------------------------------------
# 1. Logging configuration
# ---------------------------------------------------------------------------
LOG_FMT = "%(asctime)s [%(levelname)s] %(name)s - %(message)s"
logging.basicConfig(stream=sys.stdout, format=LOG_FMT, level=logging.DEBUG)
logger = logging.getLogger("kafka-client")
logging.getLogger("urllib3").setLevel(logging.INFO)
def fatal(msg: str, *fmt):
"""
Log an error and terminate the program with exit status 1.
"""
logger.error(msg, *fmt)
print("Exiting due to fatal error.")
sys.exit(1)
# ---------------------------------------------------------------------------
# 2. Token retrieval helpers (with retries)
# ---------------------------------------------------------------------------
TOKEN_URL = os.getenv("TOKEN_URL", "http://localhost:8080/realms/kafka/protocol/openid-connect/token")
CLIENT_ID = "myapi"
CLIENT_SECRET = "12345"
session = requests.Session()
session.mount(
"http://",
HTTPAdapter(max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"],
)),
)
def fetch_token() -> Tuple[str, int]:
logger.debug("POST %s", TOKEN_URL)
resp = session.post(
TOKEN_URL,
data={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
timeout=5,
)
if not resp.ok:
fatal("Token fetch HTTP %d: %s", resp.status_code, resp.text)
data: dict[str, Any] = resp.json()
token, exp = data.get("access_token"), int(data.get("expires_in", 0))
if not token:
fatal("Token fetch did not return an access_token field: %s", data)
logger.info("Obtained access_token, expires in %ds", exp)
logger.info("JWT token: %s", token) # <<<<<< NEW (prints token)
print("Decoded JWT payload as JSON string:")
try:
payload_str = decode_jwt_payload(token)
print(payload_str) # Print the decoded payload
except Exception as e:
fatal("Failed to decode JWT token: %s", e)
return token, exp
class OAuthTokenProvider:
def __init__(self):
self._token: str | None = None
self._expiry: float = 0.0
def __call__(self, _: str):
now = time.time()
if not self._token or now >= self._expiry:
self._token, ttl = fetch_token()
self._expiry = now + ttl - 10
logger.debug("Token cached until %.0f", self._expiry)
return self._token, self._expiry
# ---------------------------------------------------------------------------
# 3. Kafka config
# ---------------------------------------------------------------------------
def kafka_err_cb(err: KafkaError):
"""
Any asynchronous Kafka error → immediate exit.
"""
fatal("Kafka error callback: %s", err)
common_conf: dict[str, Any] = {
"bootstrap.servers": args.bootstrap,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "OAUTHBEARER",
"oauth_cb": OAuthTokenProvider(),
"debug": os.getenv("KAFKA_DEBUG", "security,broker,protocol,consumer,fetch"),
"log_level": 7,
"error_cb": kafka_err_cb,
}
# ---------------------------------------------------------------------------
# 4. Build clients
# ---------------------------------------------------------------------------
_delivery_errors: list[KafkaError] = [] # populated in on_delivery
def on_delivery(err, msg):
if err:
_delivery_errors.append(err)
logger.error("Delivery failed key=%s: %s", msg.key(), err)
else:
logger.info("Delivered to %s[%d]@%d", msg.topic(), msg.partition(), msg.offset())
producer = Producer({**common_conf, "on_delivery": on_delivery}, logger=logger)
consumer = Consumer({**common_conf, "group.id": args.group, "auto.offset.reset": "earliest"}, logger=logger)
# ---------------------------------------------------------------------------
# 5. Produce
# ---------------------------------------------------------------------------
logger.info("Producing %d msg(s) to '%s'", args.iterations, args.topic)
for i in range(args.iterations):
try:
producer.produce(args.topic, key=f"k{i}".encode(), value=f"OAuth-msg {i}".encode())
except BufferError as exc:
fatal("Produce failed (BufferError): %s", exc)
producer.flush(10)
if _delivery_errors:
fatal("One or more messages failed to deliver – aborting.")
# ---------------------------------------------------------------------------
# 6. Consume
# ---------------------------------------------------------------------------
consumer.subscribe([args.topic])
logger.info("Subscribed, polling for up to 15s…")
end = time.time() + 15
while time.time() < end:
msg = consumer.poll(1)
if msg is None:
continue
if msg.error():
fatal("Consumer error: %s", msg.error())
logger.info(
"Received key=%s val=%s on %s[%d]@%d",
msg.key(), msg.value(), msg.topic(), msg.partition(), msg.offset()
)
break
else:
fatal("No message received within 15 seconds – aborting.")
consumer.close()
logger.info("Done – all operations completed successfully.")