|
| 1 | +# Subscriptions |
| 2 | + |
| 3 | +This guide explains how to use Redis pub/sub functionality with `async-redis` to publish and subscribe to messages. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +Redis actually has 3 mechanisms to support pub/sub - a general `SUBSCRIBE` command, a pattern-based `PSUBSCRIBE` command, and a sharded `SSUBSCRIBE` command for cluster environments. They mostly work the same way, but have different use cases. |
| 8 | + |
| 9 | +## Subscribe |
| 10 | + |
| 11 | +The `SUBSCRIBE` command is used to subscribe to one or more channels. When a message is published to a subscribed channel, the client receives the message in real-time. |
| 12 | + |
| 13 | +First, let's create a simple listener that subscribes to messages on a channel: |
| 14 | + |
| 15 | +``` ruby |
| 16 | +require 'async' |
| 17 | +require 'async/redis' |
| 18 | + |
| 19 | +client = Async::Redis::Client.new |
| 20 | + |
| 21 | +Async do |
| 22 | + client.subscribe 'status.frontend' do |context| |
| 23 | + puts "Listening for messages on 'status.frontend'..." |
| 24 | + |
| 25 | + type, name, message = context.listen |
| 26 | + |
| 27 | + puts "Received: #{message}" |
| 28 | + end |
| 29 | +end |
| 30 | +``` |
| 31 | + |
| 32 | +Now, let's create a publisher that sends messages to the same channel: |
| 33 | + |
| 34 | +``` ruby |
| 35 | +require 'async' |
| 36 | +require 'async/redis' |
| 37 | + |
| 38 | +client = Async::Redis::Client.new |
| 39 | + |
| 40 | +Async do |
| 41 | + puts "Publishing message..." |
| 42 | + client.publish 'status.frontend', 'good' |
| 43 | + puts "Message sent!" |
| 44 | +end |
| 45 | +``` |
| 46 | + |
| 47 | +To see pub/sub in action, you can run the listener in one terminal and the publisher in another. The listener will receive any messages sent by the publisher to the `status.frontend` channel: |
| 48 | + |
| 49 | +```bash |
| 50 | +$ ruby listener.rb |
| 51 | +Listening for messages on 'status.frontend'... |
| 52 | +Received: good |
| 53 | +``` |
| 54 | + |
| 55 | +### Error Handling |
| 56 | + |
| 57 | +Subscriptions are at-most-once delivery. In addition, subscriptions are stateful, meaning that they maintain their own internal state and can be affected by network issues or server restarts. In order to improve resilience, it's important to implement error handling and reconnection logic. |
| 58 | + |
| 59 | +```ruby |
| 60 | +require 'async' |
| 61 | +require 'async/redis' |
| 62 | + |
| 63 | +client = Async::Redis::Client.new |
| 64 | + |
| 65 | +Async do |
| 66 | + client.subscribe 'status.frontend' do |context| |
| 67 | + puts "Listening for messages on 'status.frontend'..." |
| 68 | + |
| 69 | + context.each do |type, name, message| |
| 70 | + puts "Received: #{message}" |
| 71 | + end |
| 72 | + end |
| 73 | +rescue => error |
| 74 | + Console.warn(self, "Subscription failed", error) |
| 75 | + sleep 1 |
| 76 | + retry |
| 77 | +end |
| 78 | +``` |
| 79 | + |
| 80 | +## Pattern Subscribe |
| 81 | + |
| 82 | +The `PSUBSCRIBE` command is used to subscribe to channels that match a given pattern. This allows clients to receive messages from multiple channels without subscribing to each one individually. |
| 83 | + |
| 84 | +Let's replace the receiver in the above example: |
| 85 | + |
| 86 | +``` ruby |
| 87 | +require 'async' |
| 88 | +require 'async/redis' |
| 89 | + |
| 90 | +endpoint = Async::Redis.local_endpoint |
| 91 | +client = Async::Redis::Client.new(endpoint) |
| 92 | + |
| 93 | +Async do |
| 94 | + client.psubscribe 'status.*' do |context| |
| 95 | + puts "Listening for messages on 'status.*'..." |
| 96 | + |
| 97 | + type, pattern, name, message = context.listen |
| 98 | + |
| 99 | + puts "Received: #{message}" |
| 100 | + end |
| 101 | +end |
| 102 | +``` |
| 103 | + |
| 104 | +Note that an extra field, `pattern` is returned when using `PSUBSCRIBE`. This field indicates the pattern that was matched for the incoming message. This can be useful for logging or debugging purposes, as it allows you to see which pattern triggered the message delivery. |
| 105 | + |
| 106 | +## Shard Subscribe |
| 107 | + |
| 108 | +If you are working with a clustered environment, you can improve performance by limiting the scope of your subscriptions to specific shards. This can help reduce the amount of data that needs to be sent between shards and improve overall throughput. |
| 109 | + |
| 110 | +To use sharded subscriptions, use a cluster client which supports sharded pub/sub: |
| 111 | + |
| 112 | +``` ruby |
| 113 | +require 'async' |
| 114 | +require 'async/redis' |
| 115 | + |
| 116 | +# endpoints = ... |
| 117 | +cluster_client = Async::Redis::ClusterClient.new(endpoints) |
| 118 | + |
| 119 | +Async do |
| 120 | + cluster_client.subscribe 'status.frontend' do |context| |
| 121 | + puts "Listening for messages on 'status.frontend'..." |
| 122 | + |
| 123 | + type, name, message = context.listen |
| 124 | + |
| 125 | + puts "Received: #{message}" |
| 126 | + end |
| 127 | +end |
| 128 | +``` |
| 129 | + |
| 130 | +``` ruby |
| 131 | +require 'async' |
| 132 | +require 'async/redis' |
| 133 | + |
| 134 | +# endpoints = ... |
| 135 | +cluster_client = Async::Redis::ClusterClient.new(endpoints) |
| 136 | + |
| 137 | +Async do |
| 138 | + puts "Publishing message..." |
| 139 | + cluster_client.publish('status.frontend', 'good') |
| 140 | + puts "Message sent!" |
| 141 | +end |
| 142 | +``` |
| 143 | + |
| 144 | +### Clustered Subscriptions |
| 145 | + |
| 146 | +While general `PUBLISH` and `SUBSCRIBE` will work on a cluster, they are less efficient as they require inter-shard communication. By default, the {ruby Async::Redis::ClusterClient} subscription mechanism defaults to `SSUBSCRIBE` and `SPUBLISH`, which are optimized for sharded environments. However, if using multiple subscriptions, internally, several connections will be made to the relevant shards, which increases the complexity. |
| 147 | + |
| 148 | +#### Cluster Topology Changes and Subscription Invalidation |
| 149 | + |
| 150 | +If the cluster is re-configured (e.g. adding or removing nodes, resharding), the subscription state may need to be re-established to account for the new topology. During this process, messages may be lost. This is expected as subscriptions are stateless. |
| 151 | + |
| 152 | +**Important**: When any individual shard subscription fails (due to resharding, node failures, or network issues), the entire cluster subscription is invalidated and will stop delivering messages. This design ensures consistency and prevents partial subscription states that could lead to missed messages on some shards. |
| 153 | + |
| 154 | +Common scenarios that trigger subscription invalidation: |
| 155 | + |
| 156 | +- **Resharding operations**: When slots are migrated between nodes (`MOVED` errors) |
| 157 | +- **Node failures**: When Redis nodes become unavailable |
| 158 | +- **Network partitions**: When connections to specific shards are lost |
| 159 | +- **Cluster reconfiguration**: When the cluster topology changes |
| 160 | + |
| 161 | +Applications should be prepared to handle subscription failures and implement appropriate retry strategies. |
0 commit comments