Skip to content

Commit e24cd78

Browse files
authored
Support read from replica on clsuter pubsub subscribe (#25)
1 parent bf86319 commit e24cd78

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

redis/cluster.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,7 +1646,7 @@ class ClusterPubSub(PubSub):
16461646
https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
16471647
"""
16481648

1649-
def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs):
1649+
def __init__(self, redis_cluster, node=None, host=None, port=None, replica=False, **kwargs):
16501650
"""
16511651
When a pubsub instance is created without specifying a node, a single
16521652
node will be transparently chosen for the pubsub connection on the
@@ -1661,6 +1661,7 @@ def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs):
16611661
:type port: int
16621662
"""
16631663
self.node = None
1664+
self.replica = replica
16641665
self.set_pubsub_node(redis_cluster, node, host, port)
16651666
connection_pool = (
16661667
None
@@ -1794,7 +1795,7 @@ def get_sharded_message(
17941795
if message["channel"] in self.pending_unsubscribe_shard_channels:
17951796
self.pending_unsubscribe_shard_channels.remove(message["channel"])
17961797
self.shard_channels.pop(message["channel"], None)
1797-
node = self.cluster.get_node_from_key(message["channel"])
1798+
node = self.cluster.get_node_from_key(message["channel"], self.replica)
17981799
if self.node_pubsub_mapping[node.name].subscribed is False:
17991800
self.node_pubsub_mapping.pop(node.name)
18001801
if not self.channels and not self.patterns and not self.shard_channels:
@@ -1811,7 +1812,7 @@ def ssubscribe(self, *args, **kwargs):
18111812
s_channels = dict.fromkeys(args)
18121813
s_channels.update(kwargs)
18131814
for s_channel, handler in s_channels.items():
1814-
node = self.cluster.get_node_from_key(s_channel)
1815+
node = self.cluster.get_node_from_key(s_channel, self.replica)
18151816
pubsub = self._get_node_pubsub(node)
18161817
if handler:
18171818
pubsub.ssubscribe(**{s_channel: handler})
@@ -1832,7 +1833,7 @@ def sunsubscribe(self, *args):
18321833
args = self.shard_channels
18331834

18341835
for s_channel in args:
1835-
node = self.cluster.get_node_from_key(s_channel)
1836+
node = self.cluster.get_node_from_key(s_channel, self.replica)
18361837
p = self._get_node_pubsub(node)
18371838
p.sunsubscribe(s_channel)
18381839
self.pending_unsubscribe_shard_channels.update(

0 commit comments

Comments
 (0)