-
-
Notifications
You must be signed in to change notification settings - Fork 967
support redis cluster transport #2204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
support redis cluster transport #2204
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2204 +/- ##
==========================================
+ Coverage 81.60% 81.89% +0.28%
==========================================
Files 77 78 +1
Lines 9540 9963 +423
Branches 1162 1238 +76
==========================================
+ Hits 7785 8159 +374
- Misses 1563 1591 +28
- Partials 192 213 +21 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets see if the CI passes
dca3e75
to
9983557
Compare
f4d275a
to
f5c7356
Compare
c87be51
to
6986510
Compare
Only one test is failing, due to connection failure |
Check linter error :) |
Sorry for the failure, I will fix it and improve the test coverage. |
I am following it and already had reviewed it twice. Will have an in depth review again tomorrow. No worries. Thanks for picking my work |
Added docs for kombu.transport.rediscluster, now I'm sure there won't be problems anymore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally that everything passes on the CI, we can properly review the code.
That being said, we’re currently in a release phase so we can’t merge it until we complete the release, after the new year holidays.
Good work and thank you for fixing everything.
Would this also support Redis cluster for backend? |
|
||
conn_params['connection_pool_class'] = ManagedConnectionPool | ||
|
||
conn_params['url'] = f'redis://{conn_params["host"]}:{conn_params["port"]}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zs-neo thanks for all of your work on this! I'm testing out this PR right now and noticed that TLS support is broken. it looks as though it might be as simple as using the rediss
scheme here with the ssl
logic above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very sorry for the late reply. Thank you for your attention to this PR! I agree with you and I will add support and testing for TLS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you add the test for TLS?
78a6be1
to
5ba298a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, can we think about a way of adding cluster support to the existing redis transport instead of introducing a new one? what do you think?
class GlobalKeyPrefixMixin(RedisGlobalKeyPrefixMixin): | ||
"""Mixin to provide common logic for global key prefixing. | ||
|
||
copied from redis.cluster.RedisCluster.pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need copy here? is there any better way?
|
||
conn_params['connection_pool_class'] = ManagedConnectionPool | ||
|
||
conn_params['url'] = f'redis://{conn_params["host"]}:{conn_params["port"]}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you add the test for TLS?
Hey, @zs-neo I found an issue after using your patch in production for a few days. How to reproduce the issue?
Below is stack trace I captured in the new forked worker
You can see the after-fork hook calls I think the problem is that the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds native support for Redis Cluster as a transport backend in Kombu, including key prefixing and cluster-aware polling.
- Introduces a new
rediscluster.py
transport module with cluster-aware clients, pipelining, pub/sub, and QoS. - Adds comprehensive unit tests for cluster behavior under failover, key prefixing, and polling.
- Updates package exports and documentation to include the new
rediscluster
transport.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
t/unit/transport/test_rediscluster.py | New unit tests covering cluster transport behaviors |
kombu/transport/rediscluster.py | Cluster-aware transport implementation |
kombu/transport/init.py | Registers rediscluster in the transport mapping |
docs/reference/kombu.transport.rediscluster.rst | Documentation stub for the rediscluster transport |
Comments suppressed due to low confidence (3)
t/unit/transport/test_rediscluster.py:1161
- [nitpick] Using
set
as a variable name shadows the built-inset
. Consider renaming this mock to avoid confusion (e.g.,mock_set
).
set = client.set = Mock()
kombu/transport/rediscluster.py:651
- Catching
Exception
may hide unexpected errors. Restrict the except clause to anticipated exception types (e.g.,redis.exceptions.RedisError
).
def _do_restore_message(self, payload, exchange, routing_key,
kombu/transport/rediscluster.py:301
- Parameter name
type
shadows the built-intype
. Rename it to something likeevent_type
for clarity.
def _register(self, channel, client, conn, type):
it is scheduled for v5.7 release |
target_node.redis_connection = None | ||
self.client.nodes_manager.initialize() | ||
raise | ||
except MovedError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During a master-replica transition, for example, in a cluster with a total of 6 nodes (3 master nodes and 3 replica nodes), when a node undergoes a master-replica switch, an error will occur in _brpop_read.
[2025-08-06 11:38:40,268: CRITICAL/MainProcess] Unrecoverable error: ResponseError('UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)')
Traceback (most recent call last):
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 341, in start
blueprint.start(self)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 772, in start
c.loop(*c.loop_args())
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\loops.py", line 143, in synloop
_loop_cycle()
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\loops.py", line 132, in _loop_cycle
connection.drain_events(timeout=2.0)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\transport\virtual\base.py", line 997, in drain_events
get(self._deliver, timeout=timeout)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\transport\redis.py", line 598, in get
ret = self.handle_event(fileno, event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 379, in handle_event
return self.on_readable(fileno), self
^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 352, in on_readable
chan.handlers[type](**{'conn': conn})
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 512, in _brpop_read
dest__item = conn.read_response('BRPOP', **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\redis\connection.py", line 666, in read_response
raise response
redis.exceptions.ResponseError: UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rfenner we need this to be addressed
It would be nice to see this land sooon as I'm trying to use AWS serverless valkey and it runs in a cluster mode so Celery won't run on it. It's going to become more common for people to want to run their projects on serverless versions since they tend to offer scalable storage and not a fixed amount like you would get when you use a specific instance type. I guess my workaround for now will be to spin up a normal version just for celery to use. |
for more information, see https://pre-commit.ci
Co-authored-by: bashir-abdelwahed <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
5e5de4c
to
134a2bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
kombu/transport/rediscluster.py:1
- Typo in variable name: 'disconect' should be 'disconnect'
"""Redis cluster transport module for Kombu.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Co-authored-by: Copilot <[email protected]>
Hello team, any update on this one? Looks like it's very close to get shipped to next release? |
Attempt to address #1021
Thank you very much for your code, it helps us a lot. @auvipy
We use redis-py instead of redis-py-cluster because redis-py-cluster has been merged into redis-py.
Celery works fine on our cluster with multi producers and multi consumers, when a node goes down, it can automatically switch.