Skip to content

Commit a2bc45c

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-830 - Release Pub/Sub connection when closing LettuceSubscription.
We now release the native connection back to the connection provider when LettuceSubscription is closed. Previously, we just closed the connection which interfered with pooling as pooling connection providers still had a reference on the connection. Original Pull Request: spring-projects#341
1 parent 03c607a commit a2bc45c

File tree

3 files changed

+19
-11
lines changed

3 files changed

+19
-11
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ public void pSubscribe(MessageListener listener, byte[]... patterns) {
752752
throw new UnsupportedOperationException();
753753
}
754754
try {
755-
subscription = new LettuceSubscription(listener, switchToPubSub());
755+
subscription = new LettuceSubscription(listener, switchToPubSub(), connectionProvider);
756756
subscription.pSubscribe(patterns);
757757
} catch (Exception ex) {
758758
throw convertLettuceAccessException(ex);
@@ -767,7 +767,7 @@ public void subscribe(MessageListener listener, byte[]... channels) {
767767
throw new UnsupportedOperationException();
768768
}
769769
try {
770-
subscription = new LettuceSubscription(listener, switchToPubSub());
770+
subscription = new LettuceSubscription(listener, switchToPubSub(), connectionProvider);
771771
subscription.subscribe(channels);
772772

773773
} catch (Exception ex) {

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@
2525
* Message subscription on top of Lettuce.
2626
*
2727
* @author Costin Leau
28+
* @author Mark Paluch
2829
*/
2930
class LettuceSubscription extends AbstractSubscription {
3031

3132
final StatefulRedisPubSubConnection<byte[], byte[]> pubsub;
3233
private LettuceMessageListener listener;
34+
private final LettuceConnectionProvider connectionProvider;
3335

34-
LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection<byte[], byte[]> pubsubConnection) {
36+
LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection<byte[], byte[]> pubsubConnection, LettuceConnectionProvider connectionProvider) {
3537
super(listener);
3638
this.pubsub = pubsubConnection;
3739
this.listener = new LettuceMessageListener(listener);
40+
this.connectionProvider = connectionProvider;
3841

3942
pubsub.addListener(this.listener);
4043
}
@@ -47,7 +50,8 @@ protected void doClose() {
4750
pubsub.sync().punsubscribe(new byte[0]);
4851
}
4952
pubsub.removeListener(this.listener);
50-
pubsub.close();
53+
54+
connectionProvider.release(pubsub);
5155
}
5256

5357
protected void doPsubscribe(byte[]... patterns) {

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSubscriptionTests.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
*
3535
* @author Jennifer Hickey
3636
* @author Christoph Strobl
37+
* @author Mark Paluch
3738
*/
3839
public class LettuceSubscriptionTests {
3940

@@ -45,16 +46,19 @@ public class LettuceSubscriptionTests {
4546

4647
private RedisPubSubCommands<byte[], byte[]> asyncCommands;
4748

49+
private LettuceConnectionProvider connectionProvider;
50+
4851
@SuppressWarnings("unchecked")
4952
@Before
5053
public void setUp() {
5154

5255
pubsub = Mockito.mock(StatefulRedisPubSubConnection.class);
5356
listener = Mockito.mock(MessageListener.class);
5457
asyncCommands = Mockito.mock(RedisPubSubCommands.class);
58+
connectionProvider = Mockito.mock(LettuceConnectionProvider.class);
5559

5660
Mockito.when(pubsub.sync()).thenReturn(asyncCommands);
57-
subscription = new LettuceSubscription(listener, pubsub);
61+
subscription = new LettuceSubscription(listener, pubsub, connectionProvider);
5862
}
5963

6064
@Test
@@ -64,7 +68,7 @@ public void testUnsubscribeAllAndClose() {
6468
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
6569
verify(asyncCommands, never()).unsubscribe(new byte[0]);
6670
verify(asyncCommands, never()).punsubscribe(new byte[0]);
67-
verify(pubsub).close();
71+
verify(connectionProvider).release(pubsub);
6872
verify(pubsub).removeListener(any(LettuceMessageListener.class));
6973
assertFalse(subscription.isAlive());
7074
assertTrue(subscription.getChannels().isEmpty());
@@ -94,7 +98,7 @@ public void testUnsubscribeChannelAndClose() {
9498
verify(asyncCommands, times(1)).unsubscribe(channel);
9599
verify(asyncCommands, never()).unsubscribe(new byte[0]);
96100
verify(asyncCommands, never()).punsubscribe(new byte[0]);
97-
verify(pubsub).close();
101+
verify(connectionProvider).release(pubsub);
98102
verify(pubsub).removeListener(any(LettuceMessageListener.class));
99103
assertFalse(subscription.isAlive());
100104
assertTrue(subscription.getChannels().isEmpty());
@@ -167,7 +171,7 @@ public void testUnsubscribeAllNoChannels() {
167171
public void testUnsubscribeNotAlive() {
168172
subscription.subscribe(new byte[][] { "a".getBytes() });
169173
subscription.unsubscribe();
170-
verify(pubsub, times(1)).close();
174+
verify(connectionProvider, times(1)).release(pubsub);
171175
verify(pubsub, times(1)).removeListener(any(LettuceMessageListener.class));
172176
assertFalse(subscription.isAlive());
173177
subscription.unsubscribe();
@@ -192,7 +196,7 @@ public void testPUnsubscribeAllAndClose() {
192196
verify(asyncCommands, never()).punsubscribe(new byte[0]);
193197
verify(asyncCommands, times(1)).punsubscribe(new byte[][] { "a*".getBytes() });
194198
assertFalse(subscription.isAlive());
195-
verify(pubsub).close();
199+
verify(connectionProvider).release(pubsub);
196200
verify(pubsub).removeListener(any(LettuceMessageListener.class));
197201
assertTrue(subscription.getChannels().isEmpty());
198202
assertTrue(subscription.getPatterns().isEmpty());
@@ -221,7 +225,7 @@ public void testPUnsubscribeAndClose() {
221225
verify(asyncCommands, never()).unsubscribe(new byte[0]);
222226
verify(asyncCommands, never()).punsubscribe(new byte[0]);
223227
verify(asyncCommands, times(1)).punsubscribe(pattern);
224-
verify(pubsub).close();
228+
verify(connectionProvider).release(pubsub);
225229
verify(pubsub).removeListener(any(LettuceMessageListener.class));
226230
assertFalse(subscription.isAlive());
227231
assertTrue(subscription.getChannels().isEmpty());
@@ -296,7 +300,7 @@ public void testPUnsubscribeNotAlive() {
296300
subscription.unsubscribe();
297301
assertFalse(subscription.isAlive());
298302
subscription.pUnsubscribe();
299-
verify(pubsub, times(1)).close();
303+
verify(connectionProvider, times(1)).release(pubsub);
300304
verify(pubsub, times(1)).removeListener(any(LettuceMessageListener.class));
301305
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
302306
verify(asyncCommands, never()).unsubscribe(new byte[0]);

0 commit comments

Comments
 (0)