66using System . Collections . Generic ;
77using System . IO ;
88using System . Net ;
9+ using System . Reflection ;
10+ using System . Threading ;
911using System . Threading . Tasks ;
1012using StackExchange . Redis ;
1113using StackExchange . Redis . Profiling ;
@@ -73,11 +75,11 @@ public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved
7375 remove { }
7476 }
7577
76- private readonly ISubscriber _subscriber ;
78+ private readonly TestRedisServer _server ;
7779
7880 public TestConnectionMultiplexer ( TestRedisServer server )
7981 {
80- _subscriber = new TestSubscriber ( server ) ;
82+ _server = server ;
8183 }
8284
8385 public void BeginProfiling ( object forContext )
@@ -167,7 +169,7 @@ public string GetStormLog()
167169
168170 public ISubscriber GetSubscriber ( object asyncState = null )
169171 {
170- return _subscriber ;
172+ return new TestSubscriber ( _server ) ;
171173 }
172174
173175 public int HashSlot ( RedisKey key )
@@ -223,14 +225,14 @@ public void ExportConfiguration(Stream destination, ExportOptions options = (Exp
223225
224226 public class TestRedisServer
225227 {
226- private readonly ConcurrentDictionary < RedisChannel , List < Action < RedisChannel , RedisValue > > > _subscriptions =
227- new ConcurrentDictionary < RedisChannel , List < Action < RedisChannel , RedisValue > > > ( ) ;
228+ private readonly ConcurrentDictionary < RedisChannel , List < ( int , Action < RedisChannel , RedisValue > ) > > _subscriptions =
229+ new ConcurrentDictionary < RedisChannel , List < ( int , Action < RedisChannel , RedisValue > ) > > ( ) ;
228230
229231 public long Publish ( RedisChannel channel , RedisValue message , CommandFlags flags = CommandFlags . None )
230232 {
231233 if ( _subscriptions . TryGetValue ( channel , out var handlers ) )
232234 {
233- foreach ( var handler in handlers )
235+ foreach ( var ( _ , handler ) in handlers )
234236 {
235237 handler ( channel , message ) ;
236238 }
@@ -239,26 +241,37 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
239241 return handlers != null ? handlers . Count : 0 ;
240242 }
241243
242- public void Subscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler , CommandFlags flags = CommandFlags . None )
244+ public void Subscribe ( ChannelMessageQueue messageQueue , int subscriberId , CommandFlags flags = CommandFlags . None )
243245 {
244- _subscriptions . AddOrUpdate ( channel , _ => new List < Action < RedisChannel , RedisValue >> { handler } , ( _ , list ) =>
246+ Action < RedisChannel , RedisValue > handler = ( channel , value ) =>
247+ {
248+ // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
249+ // ChannelMessageQueue isn't mockable currently, this works around that by using private reflection
250+ typeof ( ChannelMessageQueue ) . GetMethod ( "Write" , BindingFlags . NonPublic | BindingFlags . Instance )
251+ . Invoke ( messageQueue , new object [ ] { channel , value } ) ;
252+ } ;
253+
254+ _subscriptions . AddOrUpdate ( messageQueue . Channel , _ => new List < ( int , Action < RedisChannel , RedisValue > ) > { ( subscriberId , handler ) } , ( _ , list ) =>
245255 {
246- list . Add ( handler ) ;
256+ list . Add ( ( subscriberId , handler ) ) ;
247257 return list ;
248258 } ) ;
249259 }
250260
251- public void Unsubscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler = null , CommandFlags flags = CommandFlags . None )
261+ public void Unsubscribe ( RedisChannel channel , int subscriberId , CommandFlags flags = CommandFlags . None )
252262 {
253263 if ( _subscriptions . TryGetValue ( channel , out var list ) )
254264 {
255- list . Remove ( handler ) ;
265+ list . RemoveAll ( ( item ) => item . Item1 == subscriberId ) ;
256266 }
257267 }
258268 }
259269
260270 public class TestSubscriber : ISubscriber
261271 {
272+ private static int StaticId ;
273+
274+ private readonly int _id ;
262275 private readonly TestRedisServer _server ;
263276 public ConnectionMultiplexer Multiplexer => throw new NotImplementedException ( ) ;
264277
@@ -267,6 +280,7 @@ public class TestSubscriber : ISubscriber
267280 public TestSubscriber ( TestRedisServer server )
268281 {
269282 _server = server ;
283+ _id = Interlocked . Increment ( ref StaticId ) ;
270284 }
271285
272286 public EndPoint IdentifyEndpoint ( RedisChannel channel , CommandFlags flags = CommandFlags . None )
@@ -307,7 +321,7 @@ public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, C
307321
308322 public void Subscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler , CommandFlags flags = CommandFlags . None )
309323 {
310- _server . Subscribe ( channel , handler , flags ) ;
324+ throw new NotImplementedException ( ) ;
311325 }
312326
313327 public Task SubscribeAsync ( RedisChannel channel , Action < RedisChannel , RedisValue > handler , CommandFlags flags = CommandFlags . None )
@@ -328,7 +342,7 @@ public bool TryWait(Task task)
328342
329343 public void Unsubscribe ( RedisChannel channel , Action < RedisChannel , RedisValue > handler = null , CommandFlags flags = CommandFlags . None )
330344 {
331- _server . Unsubscribe ( channel , handler , flags ) ;
345+ _server . Unsubscribe ( channel , _id , flags ) ;
332346 }
333347
334348 public void UnsubscribeAll ( CommandFlags flags = CommandFlags . None )
@@ -364,7 +378,15 @@ public void WaitAll(params Task[] tasks)
364378
365379 public ChannelMessageQueue Subscribe ( RedisChannel channel , CommandFlags flags = CommandFlags . None )
366380 {
367- throw new NotImplementedException ( ) ;
381+ // Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
382+ var redisSubscriberType = typeof ( RedisChannel ) . Assembly . GetType ( "StackExchange.Redis.RedisSubscriber" ) ;
383+ var ctor = typeof ( ChannelMessageQueue ) . GetConstructor ( BindingFlags . Instance | BindingFlags . NonPublic ,
384+ binder : null ,
385+ new Type [ ] { typeof ( RedisChannel ) . MakeByRefType ( ) , redisSubscriberType } , modifiers : null ) ;
386+
387+ var queue = ( ChannelMessageQueue ) ctor . Invoke ( new object [ ] { channel , null } ) ;
388+ _server . Subscribe ( queue , _id ) ;
389+ return queue ;
368390 }
369391
370392 public Task < ChannelMessageQueue > SubscribeAsync ( RedisChannel channel , CommandFlags flags = CommandFlags . None )
0 commit comments