Skip to content

Commit ae18c9e

Browse files
committed
Fixed under-read of incoming handshake data with an accumulator
1 parent 5d3cd58 commit ae18c9e

File tree

8 files changed

+165
-55
lines changed

8 files changed

+165
-55
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<artifactId>websocket</artifactId>
1010
<packaging>jar</packaging>
1111
<name>websocket</name>
12-
<version>1.10</version>
12+
<version>1.11</version>
1313
<description>Red5 WebSocket plugin</description>
1414
<url>https://github.com/Red5/red5-websocket</url>
1515
<organization>
@@ -224,7 +224,7 @@
224224
<dependency>
225225
<groupId>org.glassfish.tyrus.bundles</groupId>
226226
<artifactId>tyrus-standalone-client-jdk</artifactId>
227-
<version>1.12</version>
227+
<version>1.13.1</version>
228228
<scope>test</scope>
229229
</dependency>
230230
</dependencies>

src/main/java/org/red5/net/websocket/Constants.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class Constants {
3333

3434
public final static String SESSION = "session";
3535

36+
public static final Object WS_HANDSHAKE = "ws.handshake";
37+
3638
public final static String WS_HEADER_KEY = "Sec-WebSocket-Key";
3739

3840
public final static String WS_HEADER_VERSION = "Sec-WebSocket-Version";
@@ -58,5 +60,8 @@ public class Constants {
5860
// magic string for websockets
5961
public static final String WEBSOCKET_MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
6062

61-
public static final byte[] CRLF = { 0x0D, 0x0A };
63+
public static final byte[] CRLF = { 0x0d, 0x0a };
64+
65+
public static final byte[] END_OF_REQ = { 0x0d, 0x0a, 0x0d, 0x0a };
66+
6267
}

src/main/java/org/red5/net/websocket/WebSocketConnection.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ public void send(String data) throws UnsupportedEncodingException {
132132
* @param buf
133133
*/
134134
public void send(byte[] buf) {
135-
log.trace("send binary: {}", Arrays.toString(buf));
135+
if (log.isTraceEnabled()) {
136+
log.trace("send binary: {}", Arrays.toString(buf));
137+
}
136138
Packet packet = Packet.build(buf);
137139
session.write(packet);
138140
}
@@ -143,7 +145,9 @@ public void send(byte[] buf) {
143145
* @param buf
144146
*/
145147
public void sendPong(byte[] buf) {
146-
log.trace("send pong: {}", buf);
148+
if (log.isTraceEnabled()) {
149+
log.trace("send pong: {}", buf);
150+
}
147151
Packet packet = Packet.build(buf, MessageType.PONG);
148152
session.write(packet);
149153
}

src/main/java/org/red5/net/websocket/WebSocketPlugin.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,12 @@ public WebSocketScopeManager getManager(String path) {
111111
if (log.isTraceEnabled()) {
112112
log.trace("Path parts: {}", Arrays.toString(parts));
113113
}
114-
if (parts.length > 0) {
114+
if (parts.length > 1) {
115115
// skip default in a path if it exists in slot #1
116-
String name = !"default".equals(parts[1]) ? parts[1] : parts[2];
116+
String name = !"default".equals(parts[1]) ? parts[1] : ((parts.length >= 3) ? parts[2] : parts[1]);
117+
if (log.isDebugEnabled()) {
118+
log.debug("Managers: {}", managerMap.entrySet());
119+
}
117120
for (Entry<IScope, WebSocketScopeManager> entry : managerMap.entrySet()) {
118121
IScope appScope = entry.getKey();
119122
if (appScope.getName().equals(name)) {

src/main/java/org/red5/net/websocket/WebSocketTransport.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ public class WebSocketTransport implements InitializingBean, DisposableBean {
8181
public void afterPropertiesSet() throws Exception {
8282
// create the nio acceptor
8383
acceptor = new NioSocketAcceptor(ioThreads);
84+
// configure the acceptor
85+
SocketSessionConfig sessionConf = acceptor.getSessionConfig();
86+
sessionConf.setReuseAddress(true);
87+
sessionConf.setTcpNoDelay(true);
88+
sessionConf.setSendBufferSize(sendBufferSize);
89+
sessionConf.setReadBufferSize(receiveBufferSize);
90+
// close sessions when the acceptor is stopped
91+
acceptor.setCloseOnDeactivation(true);
92+
acceptor.setHandler(ioHandler);
93+
// requested maximum length of the queue of incoming connections
94+
acceptor.setBacklog(64);
95+
acceptor.setReuseAddress(true);
8496
// instance the websocket handler
8597
if (ioHandler == null) {
8698
ioHandler = new WebSocketHandler();
@@ -102,17 +114,6 @@ public void afterPropertiesSet() throws Exception {
102114
}
103115
// add the websocket codec factory
104116
chain.addLast("protocol", new ProtocolCodecFilter(new WebSocketCodecFactory()));
105-
// close sessions when the acceptor is stopped
106-
acceptor.setCloseOnDeactivation(true);
107-
acceptor.setHandler(ioHandler);
108-
// requested maximum length of the queue of incoming connections
109-
acceptor.setBacklog(64);
110-
SocketSessionConfig sessionConf = acceptor.getSessionConfig();
111-
sessionConf.setReuseAddress(true);
112-
sessionConf.setTcpNoDelay(true);
113-
sessionConf.setReceiveBufferSize(receiveBufferSize);
114-
sessionConf.setSendBufferSize(sendBufferSize);
115-
acceptor.setReuseAddress(true);
116117
if (addresses.isEmpty()) {
117118
if (sslFilter != null) {
118119
log.info("WebSocket (wss) will be bound to port {}", port);
@@ -144,6 +145,9 @@ public void afterPropertiesSet() throws Exception {
144145
}
145146
}
146147
log.info("started {} websocket transport", (isSecure() ? "secure" : ""));
148+
if (log.isDebugEnabled()) {
149+
log.debug("Acceptor sizes - send: {} recv: {}", acceptor.getSessionConfig().getSendBufferSize(), acceptor.getSessionConfig().getReadBufferSize());
150+
}
147151
}
148152

149153
/**

src/main/java/org/red5/net/websocket/codec/WebSocketDecoder.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,18 @@ protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput
9595
IoBuffer resultBuffer;
9696
WebSocketConnection conn = (WebSocketConnection) session.getAttribute(Constants.CONNECTION);
9797
if (conn == null) {
98+
log.debug("Decode start pos: {}", in.position());
9899
// first message on a new connection, check if its from a websocket or a native socket
99100
if (doHandShake(session, in)) {
101+
log.debug("Decode end pos: {} limit: {}", in.position(), in.limit());
100102
// websocket handshake was successful. Don't write anything to output as we want to abstract the handshake request message from the handler
101-
in.position(in.limit());
103+
if (in.position() != in.limit()) {
104+
in.position(in.limit());
105+
}
102106
return true;
107+
} else if (session.containsAttribute(Constants.WS_HANDSHAKE)) {
108+
// more still expected to come in before HS is completed
109+
return false;
103110
} else {
104111
// message is from a native socket. Simply wrap and pass through
105112
resultBuffer = IoBuffer.wrap(in.array(), 0, in.limit());
@@ -145,14 +152,40 @@ protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput
145152
*/
146153
@SuppressWarnings("unchecked")
147154
private boolean doHandShake(IoSession session, IoBuffer in) {
155+
if (log.isDebugEnabled()) {
156+
log.debug("Handshake: {}", in);
157+
}
158+
// incoming data
159+
byte[] data = null;
160+
// check for existing HS data
161+
if (session.containsAttribute(Constants.WS_HANDSHAKE)) {
162+
byte[] tmp = (byte[]) session.getAttribute(Constants.WS_HANDSHAKE);
163+
// size to hold existing and incoming
164+
data = new byte[tmp.length + in.remaining()];
165+
System.arraycopy(tmp, 0, data, 0, tmp.length);
166+
// get incoming bytes
167+
in.get(data, tmp.length, in.remaining());
168+
} else {
169+
// size for incoming bytes
170+
data = new byte[in.remaining()];
171+
// get incoming bytes
172+
in.get(data, 0, data.length);
173+
}
174+
// ensure the incoming data is complete (ends with crlfcrlf)
175+
byte[] tail = Arrays.copyOfRange(data, data.length - 4, data.length);
176+
if (!Arrays.equals(tail, Constants.END_OF_REQ)) {
177+
// accumulate the HS data
178+
session.setAttribute(Constants.WS_HANDSHAKE, data);
179+
return false;
180+
}
148181
// create the connection obj
149182
WebSocketConnection conn = new WebSocketConnection(session);
150183
// mark as secure if using ssl
151184
if (session.getFilterChain().contains("sslFilter")) {
152185
conn.setSecure(true);
153186
}
154187
try {
155-
Map<String, Object> headers = parseClientRequest(conn, new String(in.array()));
188+
Map<String, Object> headers = parseClientRequest(conn, new String(data));
156189
if (log.isTraceEnabled()) {
157190
log.trace("Header map: {}", headers);
158191
}
@@ -206,6 +239,8 @@ private boolean doHandShake(IoSession session, IoBuffer in) {
206239
// prepare response and write it to the directly to the session
207240
HandshakeResponse wsResponse = buildHandshakeResponse(conn, (String) headers.get(Constants.WS_HEADER_KEY));
208241
session.write(wsResponse);
242+
// remove handshake acculator
243+
session.removeAttribute(Constants.WS_HANDSHAKE);
209244
log.debug("Handshake complete");
210245
return true;
211246
}
@@ -231,7 +266,7 @@ private Map<String, Object> parseClientRequest(WebSocketConnection conn, String
231266
if (log.isTraceEnabled()) {
232267
log.trace("Request: {}", Arrays.toString(request));
233268
}
234-
Map<String, Object> map = new HashMap<String, Object>();
269+
Map<String, Object> map = new HashMap<>();
235270
for (int i = 0; i < request.length; i++) {
236271
log.trace("Request {}: {}", i, request[i]);
237272
if (request[i].startsWith("GET ") || request[i].startsWith("POST ") || request[i].startsWith("PUT ")) {
@@ -291,7 +326,7 @@ public void operationComplete(IoFuture future) {
291326
});
292327
throw new WebSocketException("Handshake failed, missing plugin");
293328
}
294-
}else if (request[i].contains(Constants.WS_HEADER_KEY)) {
329+
} else if (request[i].contains(Constants.WS_HEADER_KEY)) {
295330
map.put(Constants.WS_HEADER_KEY, extractHeaderValue(request[i]));
296331
} else if (request[i].contains(Constants.WS_HEADER_VERSION)) {
297332
map.put(Constants.WS_HEADER_VERSION, extractHeaderValue(request[i]));
@@ -307,26 +342,22 @@ public void operationComplete(IoFuture future) {
307342
conn.setOrigin(extractHeaderValue(request[i]));
308343
} else if (request[i].contains(Constants.HTTP_HEADER_USERAGENT)) {
309344
map.put(Constants.HTTP_HEADER_USERAGENT, extractHeaderValue(request[i]));
310-
}else if (request[i].startsWith(Constants.WS_HEADER_GENERIC_PREFIX)) {
311-
map.put(getHeaderName(request[i]), extractHeaderValue(request[i]));
345+
} else if (request[i].startsWith(Constants.WS_HEADER_GENERIC_PREFIX)) {
346+
map.put(getHeaderName(request[i]), extractHeaderValue(request[i]));
312347
}
313348
}
314349
return map;
315350
}
316-
317-
318-
351+
319352
/**
320353
* Returns the trimmed header name.
321354
*
322355
* @param requestHeader
323356
* @return value
324357
*/
325-
private String getHeaderName(String requestHeader){
326-
return requestHeader.substring(0, requestHeader.indexOf(':')).trim();
358+
private String getHeaderName(String requestHeader) {
359+
return requestHeader.substring(0, requestHeader.indexOf(':')).trim();
327360
}
328-
329-
330361

331362
/**
332363
* Returns the trimmed header value.

0 commit comments

Comments
 (0)