Skip to content

Conversation

zs-neo
Copy link

@zs-neo zs-neo commented Dec 10, 2024

Attempt to address #1021

Thank you very much for your code, it helps us a lot. @auvipy

We use redis-py instead of redis-py-cluster because redis-py-cluster has been merged into redis-py.
Celery works fine on our cluster with multi producers and multi consumers, when a node goes down, it can automatically switch.

celery==5.4.0
Django==4.1
django-celery-beat==2.7.0
django-celery-results==2.5.1
django-filter==24.3
django-redis==5.4.0
django-split-settings==0.3.0
django-timezone-field==5.1
djangorestframework==3.15.2
gunicorn==22.0.0
pyOpenSSL==21.0.0
redis==5.0.0
requests==2.22.0
uWSGI==2.0.18
app = Celery('celery_test', broker='rediscluster://:[email protected]:7024')

@Nusnus Nusnus self-requested a review December 10, 2024 13:52
Copy link

codecov bot commented Dec 10, 2024

Codecov Report

Attention: Patch coverage is 86.76123% with 56 lines in your changes missing coverage. Please review.

Project coverage is 81.89%. Comparing base (13e6938) to head (543cc93).
Report is 12 commits behind head on main.

Files with missing lines Patch % Lines
kombu/transport/rediscluster.py 86.76% 34 Missing and 22 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2204      +/-   ##
==========================================
+ Coverage   81.60%   81.89%   +0.28%     
==========================================
  Files          77       78       +1     
  Lines        9540     9963     +423     
  Branches     1162     1238      +76     
==========================================
+ Hits         7785     8159     +374     
- Misses       1563     1591      +28     
- Partials      192      213      +21     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zs-neo zs-neo marked this pull request as draft December 13, 2024 09:18
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets see if the CI passes

@zs-neo zs-neo force-pushed the feature/support_rediscluster_transport branch 4 times, most recently from dca3e75 to 9983557 Compare December 24, 2024 10:22
@zs-neo zs-neo closed this Dec 24, 2024
@zs-neo zs-neo force-pushed the feature/support_rediscluster_transport branch from f4d275a to f5c7356 Compare December 24, 2024 10:27
@zs-neo zs-neo reopened this Dec 24, 2024
@zs-neo zs-neo force-pushed the feature/support_rediscluster_transport branch 2 times, most recently from c87be51 to 6986510 Compare December 25, 2024 06:56
@auvipy
Copy link
Member

auvipy commented Dec 25, 2024

Only one test is failing, due to connection failure

@zs-neo zs-neo marked this pull request as ready for review December 25, 2024 07:58
@zs-neo
Copy link
Author

zs-neo commented Dec 25, 2024

@auvipy @Nusnus ping :)
I fixed the issues I found recently and now this PR is ready.
I'm not a kombu expert, any suggestions are welcome

@Nusnus
Copy link
Member

Nusnus commented Dec 25, 2024

Check linter error :)

@zs-neo
Copy link
Author

zs-neo commented Dec 25, 2024

Sorry for the failure, I will fix it and improve the test coverage.

@auvipy
Copy link
Member

auvipy commented Dec 25, 2024

I am following it and already had reviewed it twice. Will have an in depth review again tomorrow. No worries. Thanks for picking my work

@Nusnus
Copy link
Member

Nusnus commented Dec 25, 2024

@zs-neo
Copy link
Author

zs-neo commented Dec 26, 2024

https://github.com/celery/kombu/actions/runs/12492830091/job/34860983164?pr=2204 SigmaOS 2024-12-25 14 40 56

Added docs for kombu.transport.rediscluster, now I'm sure there won't be problems anymore

Copy link
Member

@Nusnus Nusnus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally that everything passes on the CI, we can properly review the code.
That being said, we’re currently in a release phase so we can’t merge it until we complete the release, after the new year holidays.

Good work and thank you for fixing everything.

@lan17
Copy link

lan17 commented Jan 18, 2025

Would this also support Redis cluster for backend?

@auvipy auvipy added this to the 5.6.0 milestone Feb 16, 2025
@auvipy auvipy self-requested a review February 16, 2025 05:13

conn_params['connection_pool_class'] = ManagedConnectionPool

conn_params['url'] = f'redis://{conn_params["host"]}:{conn_params["port"]}'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zs-neo thanks for all of your work on this! I'm testing out this PR right now and noticed that TLS support is broken. it looks as though it might be as simple as using the rediss scheme here with the ssl logic above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very sorry for the late reply. Thank you for your attention to this PR! I agree with you and I will add support and testing for TLS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you add the test for TLS?

@auvipy auvipy modified the milestones: 5.6.0, 5.7.0 Mar 20, 2025
Copilot

This comment was marked as outdated.

@Nusnus Nusnus force-pushed the feature/support_rediscluster_transport branch 2 times, most recently from 78a6be1 to 5ba298a Compare May 31, 2025 15:51
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can we think about a way of adding cluster support to the existing redis transport instead of introducing a new one? what do you think?

class GlobalKeyPrefixMixin(RedisGlobalKeyPrefixMixin):
"""Mixin to provide common logic for global key prefixing.

copied from redis.cluster.RedisCluster.pipeline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need copy here? is there any better way?


conn_params['connection_pool_class'] = ManagedConnectionPool

conn_params['url'] = f'redis://{conn_params["host"]}:{conn_params["port"]}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you add the test for TLS?

@dingxiong
Copy link

Hey, @zs-neo I found an issue after using your patch in production for a few days.
The observation is that when a worker process dies, a new worker process is forked, and then the controller process gets stuck. Both the new worker process and the controller process are reading the same socket created by BRPOP.

How to reproduce the issue?

  1. start the celery server celery -A celery_app worker --concurrency=2 --loglevel=INFO --prefetch-multiplier=1 --queues=<queue-name> --without-mingle -Ofair --max-tasks-per-child=3
  2. Send a ping message celery -A celery_app inspect ping -d celery@$HOSTNAME
  3. Enqueue a few messages.

Below is stack trace I captured in the new forked worker

  File "/usr/local/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 496, in _event_process_exit
    self.maintain_pool()
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1351, in maintain_pool
    self._maintain_pool()
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1343, in _maintain_pool
    self._repopulate_pool(joined)
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1328, in _repopulate_pool
    self._create_worker_process(self._avail_index())
  File "/usr/local/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 491, in _create_worker_process
    return super()._create_worker_process(i)
  File "/usr/local/lib/python3.11/site-packages/billiard/pool.py", line 1158, in _create_worker_process
    w.start()
  File "/usr/local/lib/python3.11/site-packages/billiard/process.py", line 120, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.11/site-packages/billiard/context.py", line 331, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.11/site-packages/billiard/popen_fork.py", line 22, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.11/site-packages/billiard/popen_fork.py", line 77, in _launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.11/site-packages/billiard/process.py", line 316, in _bootstrap
    util._run_after_forkers()
  File "/usr/local/lib/python3.11/multiprocessing/util.py", line 170, in _run_after_forkers
    func(obj)
  File "/usr/local/lib/python3.11/site-packages/kombu/resource.py", line 21, in _after_fork_cleanup_resource
    resource.force_close_all()
  File "/usr/local/lib/python3.11/site-packages/kombu/resource.py", line 181, in force_close_all
    self.collect_resource(res)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 1082, in collect_resource
    return resource.collect(socket_timeout)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 384, in collect
    self._do_close_self()
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 354, in _do_close_self
    self.maybe_close_channel(self._default_channel)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 346, in maybe_close_channel
    channel.close()
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/rediscluster.py", line 597, in close
    self._brpop_read(**{"conn": conn})

You can see the after-fork hook calls rediscluster.Transport.close which has logic to do _brpop_read.

I think the problem is that the after_fork function does not clean up _in_poll_connections. I propose following fix.
What do you think?

$ git --no-pager diff -U10
diff --git a/kombu/transport/rediscluster.py b/kombu/transport/rediscluster.py 
index d0653fcf..549edf32 100644
--- a/kombu/transport/rediscluster.py
+++ b/kombu/transport/rediscluster.py
@@ -454,20 +454,21 @@ class Channel(RedisChannel):

     def _after_fork(self):
         self._disconnect_pools()

     def _disconnect_pools(self):
         client = self._client
         if client is not None:
             client.disconnect_connection_pools()
             client.close()
         self._client = None
+        self._in_poll_connections.clear()

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds native support for Redis Cluster as a transport backend in Kombu, including key prefixing and cluster-aware polling.

  • Introduces a new rediscluster.py transport module with cluster-aware clients, pipelining, pub/sub, and QoS.
  • Adds comprehensive unit tests for cluster behavior under failover, key prefixing, and polling.
  • Updates package exports and documentation to include the new rediscluster transport.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
t/unit/transport/test_rediscluster.py New unit tests covering cluster transport behaviors
kombu/transport/rediscluster.py Cluster-aware transport implementation
kombu/transport/init.py Registers rediscluster in the transport mapping
docs/reference/kombu.transport.rediscluster.rst Documentation stub for the rediscluster transport
Comments suppressed due to low confidence (3)

t/unit/transport/test_rediscluster.py:1161

  • [nitpick] Using set as a variable name shadows the built-in set. Consider renaming this mock to avoid confusion (e.g., mock_set).
        set = client.set = Mock()

kombu/transport/rediscluster.py:651

  • Catching Exception may hide unexpected errors. Restrict the except clause to anticipated exception types (e.g., redis.exceptions.RedisError).
    def _do_restore_message(self, payload, exchange, routing_key,

kombu/transport/rediscluster.py:301

  • Parameter name type shadows the built-in type. Rename it to something like event_type for clarity.
    def _register(self, channel, client, conn, type):

@artemvang
Copy link

@zs-neo @auvipy Could you please provide a status of this PR? We already use this code in production, but it would be great to see it in upcoming release. Thanks for work!

@auvipy
Copy link
Member

auvipy commented Aug 5, 2025

it is scheduled for v5.7 release

target_node.redis_connection = None
self.client.nodes_manager.initialize()
raise
except MovedError:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During a master-replica transition, for example, in a cluster with a total of 6 nodes (3 master nodes and 3 replica nodes), when a node undergoes a master-replica switch, an error will occur in _brpop_read.

[2025-08-06 11:38:40,268: CRITICAL/MainProcess] Unrecoverable error: ResponseError('UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)')
Traceback (most recent call last):
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\worker.py", line 203, in start
    self.blueprint.start(self)
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 116, in start
    step.start(parent)
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 365, in start
    return self.obj.start()
           ^^^^^^^^^^^^^^^^
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 341, in start
    blueprint.start(self)
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 116, in start
    step.start(parent)
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 772, in start
    c.loop(*c.loop_args())
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\loops.py", line 143, in synloop
    _loop_cycle()
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\loops.py", line 132, in _loop_cycle
    connection.drain_events(timeout=2.0)
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\connection.py", line 341, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\transport\virtual\base.py", line 997, in drain_events
    get(self._deliver, timeout=timeout)
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\transport\redis.py", line 598, in get
    ret = self.handle_event(fileno, event)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 379, in handle_event
    return self.on_readable(fileno), self
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 352, in on_readable 
    chan.handlers[type](**{'conn': conn})
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 512, in _brpop_read 
    dest__item = conn.read_response('BRPOP', **options)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\redis\connection.py", line 666, in read_response
    raise response
redis.exceptions.ResponseError: UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?) 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rfenner we need this to be addressed

@rfenner
Copy link

rfenner commented Sep 11, 2025

It would be nice to see this land sooon as I'm trying to use AWS serverless valkey and it runs in a cluster mode so Celery won't run on it. It's going to become more common for people to want to run their projects on serverless versions since they tend to offer scalable storage and not a fixed amount like you would get when you use a specific instance type.

I guess my workaround for now will be to spin up a normal version just for celery to use.

@auvipy auvipy force-pushed the feature/support_rediscluster_transport branch from 5e5de4c to 134a2bb Compare September 11, 2025 05:40
@auvipy auvipy requested a review from Copilot September 11, 2025 05:40
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

kombu/transport/rediscluster.py:1

  • Typo in variable name: 'disconect' should be 'disconnect'
"""Redis cluster transport module for Kombu.

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@Ragnarow
Copy link

Hello team, any update on this one? Looks like it's very close to get shipped to next release?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.