Skip to content

Commit 1c91a52

Browse files
committed
Add STOMP subscribe/unscubscribe ApplicationContext events
Issue: SPR-11813
1 parent 9aa53ab commit 1c91a52

File tree

9 files changed

+178
-71
lines changed

9 files changed

+178
-71
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.socket.messaging;
18+
19+
20+
import org.springframework.context.ApplicationEvent;
21+
import org.springframework.messaging.Message;
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* A base class for events for a message received from a WebSocket client and
26+
* parsed into a higher level sub-protocol (e.g. STOMP).
27+
*
28+
* @author Rossen Stoyanchev
29+
* @since 4.0.3
30+
*/
31+
@SuppressWarnings("serial")
32+
public abstract class AbstractSubProtocolEvent extends ApplicationEvent {
33+
34+
private final Message<byte[]> message;
35+
36+
37+
/**
38+
* Create a new SessionConnectEvent.
39+
*
40+
* @param source the component that published the event (never {@code null})
41+
* @param message the connect message
42+
*/
43+
protected AbstractSubProtocolEvent(Object source, Message<byte[]> message) {
44+
super(source);
45+
Assert.notNull(message, "'message' must not be null");
46+
this.message = message;
47+
}
48+
49+
/**
50+
* Return the Message associated with the event. Here is an example of
51+
* obtaining information about the session id or any headers in the
52+
* message:
53+
*
54+
* <pre class="code">
55+
* StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
56+
* headers.getSessionId();
57+
* headers.getSessionAttributes();
58+
* headers.getPrincipal();
59+
* </pre>
60+
*
61+
*/
62+
public Message<byte[]> getMessage() {
63+
return this.message;
64+
}
65+
66+
67+
@Override
68+
public String toString() {
69+
return getClass().getSimpleName() + "[" + this.message + "]";
70+
}
71+
}

spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package org.springframework.web.socket.messaging;
1818

1919

20-
import org.springframework.context.ApplicationEvent;
2120
import org.springframework.messaging.Message;
22-
import org.springframework.util.Assert;
2321

2422
/**
2523
* Event raised when a new WebSocket client using a Simple Messaging Protocol
@@ -29,50 +27,15 @@
2927
* but rather the client's first attempt to connect within the the sub-protocol,
3028
* for example sending the STOMP CONNECT frame.
3129
*
32-
* <p>The provided {@link #getMessage() message} can be examined to check
33-
* information about the connected user, The session id, and any headers
34-
* sent by the client, for STOMP check the class
35-
* {@link org.springframework.messaging.simp.stomp.StompHeaderAccessor}.
36-
* For example:
37-
*
38-
* <pre class="code">
39-
* StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
40-
* headers.getSessionId();
41-
* headers.getSessionAttributes();
42-
* headers.getPrincipal();
43-
* </pre>
44-
*
4530
* @author Rossen Stoyanchev
4631
* @since 4.0.3
4732
*/
4833
@SuppressWarnings("serial")
49-
public class SessionConnectEvent extends ApplicationEvent {
34+
public class SessionConnectEvent extends AbstractSubProtocolEvent {
5035

51-
private final Message<byte[]> message;
5236

53-
54-
/**
55-
* Create a new SessionConnectEvent.
56-
*
57-
* @param source the component that published the event (never {@code null})
58-
* @param message the connect message
59-
*/
6037
public SessionConnectEvent(Object source, Message<byte[]> message) {
61-
super(source);
62-
Assert.notNull(message, "'message' must not be null");
63-
this.message = message;
38+
super(source, message);
6439
}
6540

66-
/**
67-
* Return the connect message.
68-
*/
69-
public Message<byte[]> getMessage() {
70-
return this.message;
71-
}
72-
73-
74-
@Override
75-
public String toString() {
76-
return "SessionConnectEvent" + this.message;
77-
}
7841
}

spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,45 +17,22 @@
1717
package org.springframework.web.socket.messaging;
1818

1919

20-
import org.springframework.context.ApplicationEvent;
2120
import org.springframework.messaging.Message;
22-
import org.springframework.util.Assert;
2321

2422
/**
2523
* A connected event represents the server response to a client's connect request.
26-
* See {@link org.springframework.web.socket.messaging.SessionConnectEvent}.
24+
* See {@link org.springframework.web.socket.messaging.SessionConnectEvent
25+
* SessionConnectEvent}.
2726
*
2827
* @author Rossen Stoyanchev
2928
* @since 4.0.3
3029
*/
3130
@SuppressWarnings("serial")
32-
public class SessionConnectedEvent extends ApplicationEvent {
31+
public class SessionConnectedEvent extends AbstractSubProtocolEvent {
3332

34-
private final Message<byte[]> message;
3533

36-
37-
/**
38-
* Create a new event.
39-
*
40-
* @param source the component that published the event (never {@code null})
41-
* @param message the connected message
42-
*/
4334
public SessionConnectedEvent(Object source, Message<byte[]> message) {
44-
super(source);
45-
Assert.notNull(message, "'message' must not be null");
46-
this.message = message;
47-
}
48-
49-
/**
50-
* Return the connected message.
51-
*/
52-
public Message<byte[]> getMessage() {
53-
return this.message;
35+
super(source, message);
5436
}
5537

56-
57-
@Override
58-
public String toString() {
59-
return "SessionConnectedEvent" + this.message;
60-
}
6138
}

spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public CloseStatus getCloseStatus() {
6868

6969
@Override
7070
public String toString() {
71-
return "SessionDisconnectEvent[sessionId=" + this.sessionId +
71+
return "SessionDisconnectEvent[sessionId=" + this.sessionId + ", " +
7272
(this.status != null ? this.status.toString() : "closeStatus=null") + "]";
7373
}
7474
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.socket.messaging;
18+
19+
20+
import org.springframework.messaging.Message;
21+
22+
/**
23+
* Event raised when a new WebSocket client using a Simple Messaging Protocol
24+
* (e.g. STOMP) sends a subscription request.
25+
*
26+
* @author Rossen Stoyanchev
27+
* @since 4.0.3
28+
*/
29+
@SuppressWarnings("serial")
30+
public class SessionSubscribeEvent extends AbstractSubProtocolEvent {
31+
32+
33+
public SessionSubscribeEvent(Object source, Message<byte[]> message) {
34+
super(source, message);
35+
}
36+
37+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.socket.messaging;
18+
19+
20+
import org.springframework.messaging.Message;
21+
22+
/**
23+
* Event raised when a new WebSocket client using a Simple Messaging Protocol
24+
* (e.g. STOMP) sends a request to remove a subscription.
25+
*
26+
* @author Rossen Stoyanchev
27+
* @since 4.0.3
28+
*/
29+
@SuppressWarnings("serial")
30+
public class SessionUnsubscribeEvent extends AbstractSubProtocolEvent {
31+
32+
33+
public SessionUnsubscribeEvent(Object source, Message<byte[]> message) {
34+
super(source, message);
35+
}
36+
37+
}

spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,16 @@ public void handleMessageFromClient(WebSocketSession session,
219219
headerAccessor.setUser(session.getPrincipal());
220220
headerAccessor.setImmutable();
221221

222-
if (this.eventPublisher != null && StompCommand.CONNECT.equals(headerAccessor.getCommand())) {
223-
publishEvent(new SessionConnectEvent(this, message));
222+
if (this.eventPublisher != null) {
223+
if (StompCommand.CONNECT.equals(headerAccessor.getCommand())) {
224+
publishEvent(new SessionConnectEvent(this, message));
225+
}
226+
else if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) {
227+
publishEvent(new SessionSubscribeEvent(this, message));
228+
}
229+
else if (StompCommand.UNSUBSCRIBE.equals(headerAccessor.getCommand())) {
230+
publishEvent(new SessionUnsubscribeEvent(this, message));
231+
}
224232
}
225233

226234
try {

spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,24 @@ public void eventPublication() {
184184
message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
185185
this.protocolHandler.handleMessageToClient(this.session, message);
186186

187+
headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
188+
message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
189+
textMessage = new TextMessage(new StompEncoder().encode(message));
190+
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
191+
192+
headers = StompHeaderAccessor.create(StompCommand.UNSUBSCRIBE);
193+
message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
194+
textMessage = new TextMessage(new StompEncoder().encode(message));
195+
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
196+
187197
this.protocolHandler.afterSessionEnded(this.session, CloseStatus.BAD_DATA, this.channel);
188198

189-
assertEquals("Unexpected events " + publisher.events, 3, publisher.events.size());
199+
assertEquals("Unexpected events " + publisher.events, 5, publisher.events.size());
190200
assertEquals(SessionConnectEvent.class, publisher.events.get(0).getClass());
191201
assertEquals(SessionConnectedEvent.class, publisher.events.get(1).getClass());
192-
assertEquals(SessionDisconnectEvent.class, publisher.events.get(2).getClass());
202+
assertEquals(SessionSubscribeEvent.class, publisher.events.get(2).getClass());
203+
assertEquals(SessionUnsubscribeEvent.class, publisher.events.get(3).getClass());
204+
assertEquals(SessionDisconnectEvent.class, publisher.events.get(4).getClass());
193205
}
194206

195207
@Test

src/asciidoc/index.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38417,6 +38417,8 @@ to this event can wrap the contained message using `SimpMessageHeaderAccessor` o
3841738417
* `SessionConnectedEvent` -- published shortly after a `SessionConnectEvent` when the
3841838418
broker has sent a STOMP CONNECTED frame in response to the CONNECT. At this point the
3841938419
STOMP session can be considered fully established.
38420+
* `SessionSubscribeEvent` -- published when a new STOMP SUBSCRIBE is received.
38421+
* `SessionUnsubscribeEvent` -- published when a new STOMP UNSUBSCRIBE is received.
3842038422
* `SessionDisconnectEvent` -- published when a STOMP session ends. The DISCONNECT may
3842138423
have been sent from the client or it may also be automatically generated when the
3842238424
WebSocket session is closed. In some cases this event may be published more than once

0 commit comments

Comments
 (0)