-
Notifications
You must be signed in to change notification settings - Fork 789
Description
Describe your environment
OS: Ubuntu
Python version: (e.g., Python 3.13.1)
Package version: 0.51b0
Aiokafka[lz4] version: 0.12.0
What happened?
we have a memory leakage caused by asyncio when using AIOKafkaConsumer in our fastapi app, exactly as documented in aiokafka documentation.
we do think that using getone() within a while loop instead of anext solves that issue but we want to follow aiokafka best practices.
Steps to Reproduce
add the following code as part of the fastapi app startup lifespan:
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())
then trigger the consume once with a message, and the memory will scale up exponentially.
you can check the heap using guppy3 and tracemalloc. the best way is just to measure the memory utilization of the process.
Expected Result
stable memory utilization when using aiokafka's best practices.
Actual Result
exponentially increasing memory utilization.
Additional context
No response
Would you like to implement a fix?
No