Skip to content

Commit 8ffd07f

Browse files
author
Ian Craggs
committed
Merge branch 'develop'
Conflicts: src/MQTTAsync.c
2 parents 69d9be4 + 6d4cf04 commit 8ffd07f

File tree

5 files changed

+123
-18
lines changed

5 files changed

+123
-18
lines changed

build.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
</taskdef>
2525

2626
<property name="output.folder" value="build/output" />
27-
<property name="release.version" value="1.0.1" />
27+
<property name="release.version" value="1.0.2" />
2828

2929
<property name="libname" value="mqttv3c" />
3030
<property name="libname.ssl" value="mqttv3cs" />

src/MQTTAsync.c

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
* Ian Craggs - MQTT 3.1.1 support
2323
* Rong Xiang, Ian Craggs - C++ compatibility
2424
* Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
25+
* Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
26+
* Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
2527
*******************************************************************************/
2628

2729
/**
@@ -75,6 +77,8 @@ enum MQTTAsync_threadStates
7577

7678
enum MQTTAsync_threadStates sendThread_state = STOPPED;
7779
enum MQTTAsync_threadStates receiveThread_state = STOPPED;
80+
static thread_id_type sendThread_id = 0,
81+
receiveThread_id = 0;
7882

7983
#if defined(WIN32) || defined(WIN64)
8084
static mutex_type mqttasync_mutex = NULL;
@@ -325,15 +329,15 @@ void MQTTAsync_lock_mutex(mutex_type amutex)
325329
{
326330
int rc = Thread_lock_mutex(amutex);
327331
if (rc != 0)
328-
Log(LOG_ERROR, 0, "Error %d locking mutex", rc);
332+
Log(LOG_ERROR, 0, "Error %s locking mutex", strerror(rc));
329333
}
330334

331335

332336
void MQTTAsync_unlock_mutex(mutex_type amutex)
333337
{
334338
int rc = Thread_unlock_mutex(amutex);
335339
if (rc != 0)
336-
Log(LOG_ERROR, 0, "Error %d unlocking mutex", rc);
340+
Log(LOG_ERROR, 0, "Error %s unlocking mutex", strerror(rc));
337341
}
338342

339343

@@ -840,21 +844,19 @@ void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
840844
int i;
841845

842846
for (i = 0; i < command->command.details.sub.count; i++)
843-
{
844847
free(command->command.details.sub.topics[i]);
845-
free(command->command.details.sub.topics);
846-
free(command->command.details.sub.qoss);
847-
}
848+
849+
free(command->command.details.sub.topics);
850+
free(command->command.details.sub.qoss);
848851
}
849852
else if (command->command.type == UNSUBSCRIBE)
850853
{
851854
int i;
852855

853856
for (i = 0; i < command->command.details.unsub.count; i++)
854-
{
855857
free(command->command.details.unsub.topics[i]);
856-
free(command->command.details.unsub.topics);
857-
}
858+
859+
free(command->command.details.unsub.topics);
858860
}
859861
else if (command->command.type == PUBLISH)
860862
{
@@ -1255,6 +1257,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
12551257
FUNC_ENTRY;
12561258
MQTTAsync_lock_mutex(mqttasync_mutex);
12571259
sendThread_state = RUNNING;
1260+
sendThread_id = Thread_getid();
12581261
MQTTAsync_unlock_mutex(mqttasync_mutex);
12591262
while (!tostop)
12601263
{
@@ -1281,6 +1284,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
12811284
sendThread_state = STOPPING;
12821285
MQTTAsync_lock_mutex(mqttasync_mutex);
12831286
sendThread_state = STOPPED;
1287+
sendThread_id = 0;
12841288
MQTTAsync_unlock_mutex(mqttasync_mutex);
12851289
FUNC_EXIT;
12861290
return 0;
@@ -1455,6 +1459,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
14551459
FUNC_ENTRY;
14561460
MQTTAsync_lock_mutex(mqttasync_mutex);
14571461
receiveThread_state = RUNNING;
1462+
receiveThread_id = Thread_getid();
14581463
while (!tostop)
14591464
{
14601465
int rc = SOCKET_ERROR;
@@ -1671,6 +1676,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
16711676
}
16721677
}
16731678
receiveThread_state = STOPPED;
1679+
receiveThread_id = 0;
16741680
MQTTAsync_unlock_mutex(mqttasync_mutex);
16751681
#if !defined(WIN32) && !defined(WIN64)
16761682
if (sendThread_state != STOPPED)
@@ -2143,6 +2149,56 @@ int MQTTAsync_isConnected(MQTTAsync handle)
21432149
}
21442150

21452151

2152+
int cmdMessageIDCompare(void* a, void* b)
2153+
{
2154+
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)a;
2155+
return cmd->command.token == *(int*)b;
2156+
}
2157+
2158+
2159+
/**
2160+
* Assign a new message id for a client. Make sure it isn't already being used and does
2161+
* not exceed the maximum.
2162+
* @param m a client structure
2163+
* @return the next message id to use, or 0 if none available
2164+
*/
2165+
int MQTTAsync_assignMsgId(MQTTAsyncs* m)
2166+
{
2167+
int start_msgid = m->c->msgID;
2168+
int msgid = start_msgid;
2169+
thread_id_type thread_id = 0;
2170+
int locked = 0;
2171+
2172+
/* need to check: commands list and response list for a client */
2173+
FUNC_ENTRY;
2174+
/* We might be called in a callback. In which case, this mutex will be already locked. */
2175+
thread_id = Thread_getid();
2176+
if (thread_id != sendThread_id && thread_id != receiveThread_id)
2177+
{
2178+
MQTTAsync_lock_mutex(mqttasync_mutex);
2179+
locked = 1;
2180+
}
2181+
2182+
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
2183+
while (ListFindItem(commands, &msgid, cmdMessageIDCompare) ||
2184+
ListFindItem(m->responses, &msgid, cmdMessageIDCompare))
2185+
{
2186+
msgid = (msgid == MAX_MSG_ID) ? 1 : msgid + 1;
2187+
if (msgid == start_msgid)
2188+
{ /* we've tried them all - none free */
2189+
msgid = 0;
2190+
break;
2191+
}
2192+
}
2193+
if (msgid != 0)
2194+
m->c->msgID = msgid;
2195+
if (locked)
2196+
MQTTAsync_unlock_mutex(mqttasync_mutex);
2197+
FUNC_EXIT_RC(msgid);
2198+
return msgid;
2199+
}
2200+
2201+
21462202
int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int* qos, MQTTAsync_responseOptions* response)
21472203
{
21482204
MQTTAsyncs* m = handle;
@@ -2175,7 +2231,7 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, int
21752231
goto exit;
21762232
}
21772233
}
2178-
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2234+
if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
21792235
{
21802236
rc = MQTTASYNC_NO_MORE_MSGIDS;
21812237
goto exit;
@@ -2248,7 +2304,7 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M
22482304
goto exit;
22492305
}
22502306
}
2251-
if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2307+
if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
22522308
{
22532309
rc = MQTTASYNC_NO_MORE_MSGIDS;
22542310
goto exit;
@@ -2307,7 +2363,7 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
23072363
rc = MQTTASYNC_BAD_UTF8_STRING;
23082364
else if (qos < 0 || qos > 2)
23092365
rc = MQTTASYNC_BAD_QOS;
2310-
else if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
2366+
else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
23112367
rc = MQTTASYNC_NO_MORE_MSGIDS;
23122368

23132369
if (rc != MQTTASYNC_SUCCESS)

src/MQTTProtocolClient.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Ian Craggs - fix for bug 413429 - connectionLost not called
1717
* Ian Craggs - fix for bug 421103 - trying to write to same socket, in retry
1818
* Rong Xiang, Ian Craggs - C++ compatibility
19+
* Ian Craggs - turn off DUP flag for PUBREL - MQTT 3.1.1
1920
*******************************************************************************/
2021

2122
/**
@@ -596,7 +597,7 @@ void MQTTProtocol_retries(time_t now, Clients* client, int regardless)
596597
else if (m->qos && m->nextMessageType == PUBCOMP)
597598
{
598599
Log(TRACE_MIN, 7, NULL, "PUBREL", client->clientID, client->net.socket, m->msgid);
599-
if (MQTTPacket_send_pubrel(m->msgid, 1, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
600+
if (MQTTPacket_send_pubrel(m->msgid, 0, &client->net, client->clientID) != TCPSOCKET_COMPLETE)
600601
{
601602
client->good = 0;
602603
Log(TRACE_PROTOCOL, 29, NULL, client->clientID, client->net.socket,

src/Socket.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ int Socket_error(char* aString, int sock)
9898
if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
9999
{
100100
if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
101-
Log(LOG_ERROR, -1, "Socket error %s in %s for socket %d", strerror(errno), aString, sock);
101+
Log(TRACE_MINIMUM, -1, "Socket error %s in %s for socket %d", strerror(errno), aString, sock);
102102
}
103103
FUNC_EXIT_RC(errno);
104104
return errno;

test/test4.c

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,6 +1143,16 @@ Test7: Persistence
11431143
char* test7_topic = "C client test7";
11441144
int test7_messageCount = 0;
11451145

1146+
void test7_onDisconnectFailure(void* context, MQTTAsync_failureData* response)
1147+
{
1148+
MQTTAsync c = (MQTTAsync)context;
1149+
MyLog(LOGA_DEBUG, "In onDisconnect failure callback %p", c);
1150+
1151+
assert("Successful disconnect", 0, "disconnect failed", 0);
1152+
1153+
test_finished = 1;
1154+
}
1155+
11461156
void test7_onDisconnect(void* context, MQTTAsync_successData* response)
11471157
{
11481158
MQTTAsync c = (MQTTAsync)context;
@@ -1170,7 +1180,6 @@ int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync
11701180
{
11711181
MQTTAsync c = (MQTTAsync)context;
11721182
static int message_count = 0;
1173-
int rc;
11741183

11751184
MyLog(LOGA_DEBUG, "Test7: received message id %d", message->msgid);
11761185

@@ -1212,6 +1221,24 @@ void test7_onConnect(void* context, MQTTAsync_successData* response)
12121221
}
12131222

12141223

1224+
void test7_onConnectOnly(void* context, MQTTAsync_successData* response)
1225+
{
1226+
MQTTAsync c = (MQTTAsync)context;
1227+
MQTTAsync_disconnectOptions dopts = MQTTAsync_disconnectOptions_initializer;
1228+
int rc;
1229+
1230+
MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
1231+
dopts.context = context;
1232+
dopts.timeout = 1000;
1233+
dopts.onSuccess = test7_onDisconnect;
1234+
rc = MQTTAsync_disconnect(c, &dopts);
1235+
1236+
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1237+
if (rc != MQTTASYNC_SUCCESS)
1238+
test_finished = 1;
1239+
}
1240+
1241+
12151242
/*********************************************************************
12161243
12171244
Test7: Pending tokens
@@ -1248,7 +1275,6 @@ int test7(struct Options options)
12481275
assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
12491276

12501277
opts.keepAliveInterval = 20;
1251-
opts.cleansession = 0;
12521278
opts.username = "testuser";
12531279
opts.password = "testpassword";
12541280
opts.MQTTVersion = options.MQTTVersion;
@@ -1259,11 +1285,30 @@ int test7(struct Options options)
12591285
opts.will->retained = 0;
12601286
opts.will->topicName = "will topic";
12611287
opts.will = NULL;
1262-
opts.onSuccess = test7_onConnect;
1288+
12631289
opts.onFailure = NULL;
12641290
opts.context = c;
12651291

1292+
opts.cleansession = 1;
1293+
opts.onSuccess = test7_onConnectOnly;
1294+
MyLog(LOGA_DEBUG, "Connecting to clean up");
1295+
rc = MQTTAsync_connect(c, &opts);
1296+
rc = 0;
1297+
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
1298+
if (rc != MQTTASYNC_SUCCESS)
1299+
goto exit;
1300+
1301+
while (!test_finished)
1302+
#if defined(WIN32)
1303+
Sleep(100);
1304+
#else
1305+
usleep(10000L);
1306+
#endif
1307+
1308+
test_finished = 0;
12661309
MyLog(LOGA_DEBUG, "Connecting");
1310+
opts.cleansession = 0;
1311+
opts.onSuccess = test7_onConnect;
12671312
rc = MQTTAsync_connect(c, &opts);
12681313
rc = 0;
12691314
assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
@@ -1304,6 +1349,7 @@ int test7(struct Options options)
13041349
/* disconnect immediately without receiving the incoming messages */
13051350
dopts.timeout = 0;
13061351
dopts.onSuccess = test7_onDisconnect;
1352+
dopts.context = c;
13071353
MQTTAsync_disconnect(c, &dopts); /* now there should be "orphaned" publications */
13081354

13091355
while (!test_finished)
@@ -1371,6 +1417,8 @@ int test7(struct Options options)
13711417
13721418
assertions fail against Mosquitto - needs testing */
13731419

1420+
dopts.onFailure = test7_onDisconnectFailure;
1421+
dopts.onSuccess = test7_onDisconnect;
13741422
dopts.timeout = 1000;
13751423
MQTTAsync_disconnect(c, &dopts);
13761424

0 commit comments

Comments
 (0)