Skip to content

Commit 8ce06b9

Browse files
committed
Removed separate thread for handshaking
1 parent 9c70c82 commit 8ce06b9

File tree

2 files changed

+30
-38
lines changed

2 files changed

+30
-38
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<artifactId>websocket</artifactId>
1010
<packaging>jar</packaging>
1111
<name>websocket</name>
12-
<version>1.16.13</version>
12+
<version>1.16.14</version>
1313
<description>Red5 WebSocket plugin</description>
1414
<url>https://github.com/Red5/red5-websocket</url>
1515
<organization>

src/main/java/org/red5/net/websocket/WebSocketConnection.java

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ public class WebSocketConnection {
116116
// temporary send queue
117117
private ConcurrentLinkedQueue<Packet> queue = new ConcurrentLinkedQueue<>();
118118

119-
private WriteFuture handshakeWriteFuture;
120-
119+
private WriteFuture handshakeWriteFuture;
120+
121121
/**
122122
* constructor
123123
*/
@@ -163,45 +163,37 @@ public void receive(WSMessage message) {
163163
* @param wsResponse
164164
*/
165165
public void sendHandshakeResponse(HandshakeResponse wsResponse) {
166-
// we'll do this in a separate thread so it wont violate the mina io rules
167-
Thread hs = new Thread(new Runnable() {
166+
log.debug("Writing handshake on session: {}", session.getId());
167+
// create write future
168+
handshakeWriteFuture = session.write(wsResponse);
169+
handshakeWriteFuture.addListener(new IoFutureListener<WriteFuture>() {
168170

169171
@Override
170-
public void run() {
171-
log.debug("Writing handshake on session: {}", session.getId());
172-
// create write future
173-
handshakeWriteFuture = session.write(wsResponse);
174-
handshakeWriteFuture.addListener(new IoFutureListener<WriteFuture>() {
175-
176-
@Override
177-
public void operationComplete(WriteFuture future) {
178-
IoSession sess = future.getSession();
179-
if (future.isWritten()) {
180-
// handshake is finished
181-
log.debug("Handshake write success! {}", sess.getId());
182-
// set completed flag
183-
sess.setAttribute(Constants.HANDSHAKE_COMPLETE);
184-
// set connected state on ws connection
185-
WebSocketConnection conn = (WebSocketConnection) sess.getAttribute(Constants.CONNECTION);
186-
conn.setConnected();
187-
try {
188-
// send queued packets
189-
queue.forEach(entry -> {
190-
sess.write(entry);
191-
queue.remove(entry);
192-
});
193-
} catch (Exception e) {
194-
log.warn("Exception draining queued packets on session: {}", sess.getId(), e);
195-
}
196-
} else {
197-
log.debug("Handshake write failed from: {} to: {}", sess.getLocalAddress(), sess.getRemoteAddress());
172+
public void operationComplete(WriteFuture future) {
173+
IoSession sess = future.getSession();
174+
if (future.isWritten()) {
175+
// handshake is finished
176+
log.debug("Handshake write success! {}", sess.getId());
177+
// set completed flag
178+
sess.setAttribute(Constants.HANDSHAKE_COMPLETE);
179+
// set connected state on ws connection
180+
if (connected.compareAndSet(false, true)) {
181+
try {
182+
// send queued packets
183+
queue.forEach(entry -> {
184+
sess.write(entry);
185+
queue.remove(entry);
186+
});
187+
} catch (Exception e) {
188+
log.warn("Exception draining queued packets on session: {}", sess.getId(), e);
198189
}
199190
}
200-
201-
});
191+
} else {
192+
log.warn("Handshake write failed from: {} to: {}", sess.getLocalAddress(), sess.getRemoteAddress());
193+
}
202194
}
203-
}, String.format("WSHandshakeResponse@%d", session.getId()));
204-
hs.start();
195+
196+
});
205197
}
206198

207199
/**
@@ -420,7 +412,7 @@ public boolean isConnected() {
420412
}
421413

422414
/**
423-
* on connected, put flg and clear keys.
415+
* On connected, set flag.
424416
*/
425417
public void setConnected() {
426418
connected.compareAndSet(false, true);

0 commit comments

Comments
 (0)