Skip to content

fix: ensure thread-safe handling of streams in AWSIoT components #5520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions AWSIoT/Internal/AWSIoTMQTTClient.m
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,15 @@ - (void)invalidateReconnectTimer {
- (void)cleanUpWebsocketOutputStream {
@synchronized(self) {
if (self.websocketOutputStream) {
self.websocketOutputStream.delegate = nil;
[self.websocketOutputStream close];
[self.websocketOutputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
self.websocketOutputStream = nil;
// This `websocketOutputStream` object here is possible to be accessed by other threads
@synchronized(self.websocketOutputStream) {
if (self.websocketOutputStream) { // We'd better double check its existence after obtained the lock
self.websocketOutputStream.delegate = nil;
[self.websocketOutputStream close];
[self.websocketOutputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
self.websocketOutputStream = nil;
}
}
}
}
}
Expand Down Expand Up @@ -1262,7 +1267,10 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didFailWithError:(NSError *)error
// Also, the webSocket can be set to nil
[self cleanUpWebsocketOutputStream];

[self.encoderOutputStream close];
// This `encoderOutputStream` object here is possible to be accessed by other threads
@synchronized (self.encoderOutputStream) {
[self.encoderOutputStream close];
}
[self.webSocket close];
self.webSocket = nil;

Expand Down Expand Up @@ -1300,7 +1308,10 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didCloseWithCode:(NSInteger)code r
// The WebSocket has closed. The input/output streams can be closed here.
[self cleanUpWebsocketOutputStream];

[self.encoderOutputStream close];
// This `encoderOutputStream` object here is possible to be accessed by other threads
@synchronized (self.encoderOutputStream) {
[self.encoderOutputStream close];
}
[self.webSocket close];
self.webSocket = nil;

Expand Down
36 changes: 22 additions & 14 deletions AWSIoT/Internal/AWSIoTStreamThread.m
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,33 @@ - (void)cleanUp {
}

// Make sure we handle the streams in a thread-safe way
if (self.outputStream) {
// Remove from runLoop first before closing
if (self.runLoopForStreamsThread) {
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
@synchronized(self.outputStream) {
if (self.outputStream) {
// Remove from runLoop first before closing
if (self.runLoopForStreamsThread) {
[self.outputStream removeFromRunLoop:self.runLoopForStreamsThread
forMode:NSDefaultRunLoopMode];
}
self.outputStream.delegate = nil;
[self.outputStream close];
self.outputStream = nil;
}
self.outputStream.delegate = nil;
[self.outputStream close];
self.outputStream = nil;
}

if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
// Make sure we handle the streams in a thread-safe way
@synchronized(self.decoderInputStream) {
if (self.decoderInputStream) {
[self.decoderInputStream close];
self.decoderInputStream = nil;
}
}

if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
// Make sure we handle the streams in a thread-safe way
@synchronized(self.encoderOutputStream) {
if (self.encoderOutputStream) {
[self.encoderOutputStream close];
self.encoderOutputStream = nil;
}
}
} else {
AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self);
Expand Down
13 changes: 9 additions & 4 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ - (void)open {
}

- (void)close {
AWSDDLogDebug(@"closing encoder stream.");
[self.stream close];
[self.stream setDelegate:nil];
self.stream = nil;
// Make sure we handle the streams in a thread-safe way
@synchronized (self.stream) {
if (self.stream) { // We'd better double check that the stream is not nil
AWSDDLogDebug(@"closing encoder stream.");
[self.stream close];
[self.stream setDelegate:nil];
self.stream = nil;
}
}
}

//This is executed in the runLoop.
Expand Down