Skip to content

asyncio duplicated instrumentation cause memory leak #3383

@yonathan-wolloch-lendbuzz

Description

Describe your environment

OS: Ubuntu
Python version: (e.g., Python 3.13.1)
Package version: 0.51b0
Aiokafka[lz4] version: 0.12.0

What happened?

we have a memory leakage caused by asyncio when using AIOKafkaConsumer in our fastapi app, exactly as documented in aiokafka documentation.
we do think that using getone() within a while loop instead of anext solves that issue but we want to follow aiokafka best practices.

Steps to Reproduce

add the following code as part of the fastapi app startup lifespan:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

then trigger the consume once with a message, and the memory will scale up exponentially.

you can check the heap using guppy3 and tracemalloc. the best way is just to measure the memory utilization of the process.

Expected Result

stable memory utilization when using aiokafka's best practices.

Actual Result

exponentially increasing memory utilization.

Additional context

No response

Would you like to implement a fix?

No

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions