Skip to content

Looking for how to do pub sub with sharded pub/sub in redis>7.0.1 #3561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
prakashhiranandani opened this issue Mar 15, 2025 · 2 comments
Closed
Assignees

Comments

@prakashhiranandani
Copy link

I am doing like that but is there a way the library can do it for me ?
Also I am using AWS elastic cache



import redis
import threading
import hashlib

# Utility function to hash and select a Redis node
def get_shard_node(redis_nodes, channel):
    hash_value = int(hashlib.md5(channel.encode()).hexdigest(), 16)
    return redis_nodes[hash_value % len(redis_nodes)]

# Publisher Function
def sharded_publish(redis_nodes, channel, message):
    node = get_shard_node(redis_nodes, channel)
    node.publish(channel, message)

# Subscriber Function
def sharded_subscribe(redis_nodes, channel):
    node = get_shard_node(redis_nodes, channel)
    pubsub = node.pubsub()
    pubsub.subscribe(channel)

    print(f"Subscribed to {channel} on {node}")
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data'].decode()} on {channel}")

# Redis Nodes Configuration (Cluster Nodes)
redis_nodes = [
    redis.StrictRedis(host='localhost', port=6379),
    redis.StrictRedis(host='localhost', port=6380),
    redis.StrictRedis(host='localhost', port=6381)
]

# Example Usage
if __name__ == "__main__":
    # Start subscribers in separate threads
    threading.Thread(target=sharded_subscribe, args=(redis_nodes, 'channel1')).start()
    threading.Thread(target=sharded_subscribe, args=(redis_nodes, 'channel2')).start()

    # Publish messages
    sharded_publish(redis_nodes, 'channel1', 'Hello from Channel 1')
    sharded_publish(redis_nodes, 'channel2', 'Hello from Channel 2')

@petyaslavova petyaslavova self-assigned this Mar 17, 2025
@petyaslavova
Copy link
Collaborator

Hi @prakashhiranandani ! I'll try to provide the requested information by the end of the week.

@prakashhiranandani
Copy link
Author

@petyaslavova : Thank you but I got it to work,
Subscriber is below

from redis.cluster import RedisCluster, ClusterNode
import time
r = RedisCluster(startup_nodes=[ClusterNode('rediscache-0001-001.awsclusterurl.com', 6379), ClusterNode('rediscache-0002-001.awsclusterurl.com', 6379)], ssl=True)
jamna=r.get_node_from_key("jamnachikochiki")
p = r.pubsub(node=jamna)
p.ssubscribe("jamnachikochiki")

while True:
    message = p.get_sharded_message(ignore_subscribe_messages=False, timeout=1, target_node=jamna)
    print(message)
    time.sleep(1)
~                 

Publisher

from redis.cluster import RedisCluster, ClusterNode

# Connect to Redis Cluster
r = RedisCluster(
    startup_nodes=[
        ClusterNode('rediscache-0001-001.awsclusterurl.com', 6379),
        ClusterNode('rediscache-0002-001.awsclusterurl.com'', 6379)
    ],
    ssl=True
)



node = r.get_node_from_key("jamnachikochiki")
args = ("jamnachikochiki", "Test message for sharded pub/sub")
print(node)
response = r.execute_command("SPUBLISH", *args, target_nodes=node)
print(response)

Two changes I made

p.get_sharded_message(ignore_subscribe_messages=True, timeout=1, target_node=jamna) changed this to
p.get_sharded_message(ignore_subscribe_messages=False, timeout=1, target_node=jamna)

and I deleted the slave nodes for redis,
Not sure if it was related to slave nodes or the ignore_subscribe_messages=False,
Thanks I am unblocked for now spent hours but figured it out!! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants