Skip to content

Commit 9ad3311

Browse files
bretambroseBret Ambrose
andauthored
Flaky test pass1 (awslabs#693)
* Update shared subscription test to remove invalid assertions on a property that does not necessarily hold (all subscriptions receive messages) * Increase socket timeout on request-response 311 tests * Update 311 will test to work around broker race condition wrt which connection gets terminated on duplicate client id Co-authored-by: Bret Ambrose <[email protected]>
1 parent a947367 commit 9ad3311

File tree

3 files changed

+49
-67
lines changed

3 files changed

+49
-67
lines changed

tests/IotServiceTest.cpp

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <condition_variable>
1616
#include <fstream>
1717
#include <mutex>
18+
#include <thread>
1819
#include <utility>
1920

2021
#include <aws/io/logging.h>
@@ -599,13 +600,19 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
599600
subscriberCv.wait(lock, [&]() { return subscriberSubscribed; });
600601
}
601602

603+
// there appears to be a race condition broker-side in IoT Core such that the interrupter is occasionally
604+
// rejected rather than the existing connection. We try and work around this in two ways:
605+
// (1) Wait a couple seconds to let things settle in terms of eventual consistency.
606+
// (2) Have the interrupter make connection attempts in a loop until a successful interruption occurs.
607+
std::this_thread::sleep_for(std::chrono::seconds(2));
608+
602609
// Disconnect the client by interrupting it with another client with the same ID
603610
// which will cause the will to be sent
604611
auto interruptConnection = mqttClient.NewConnection(envVars.inputHost.c_str(), 8883, socketOptions, tlsContext);
605-
interruptConnection->SetWill(topicStr.c_str(), QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload);
606612
std::mutex interruptMutex;
607613
std::condition_variable interruptCv;
608614
bool interruptConnected = false;
615+
bool interruptConnectionAttemptComplete = false;
609616
auto interruptOnConnectionCompleted =
610617
[&](MqttConnection &, int errorCode, ReturnCode returnCode, bool sessionPresent)
611618
{
@@ -614,7 +621,11 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
614621
(void)sessionPresent;
615622
{
616623
std::lock_guard<std::mutex> lock(interruptMutex);
617-
interruptConnected = true;
624+
interruptConnectionAttemptComplete = true;
625+
if (errorCode == AWS_ERROR_SUCCESS && returnCode == AWS_MQTT_CONNECT_ACCEPTED)
626+
{
627+
interruptConnected = true;
628+
}
618629
}
619630
interruptCv.notify_one();
620631
};
@@ -628,10 +639,16 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
628639
};
629640
interruptConnection->OnConnectionCompleted = interruptOnConnectionCompleted;
630641
interruptConnection->OnDisconnect = interruptOnDisconnect;
631-
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);
642+
643+
bool continueConnecting = true;
644+
while (continueConnecting)
632645
{
633-
std::unique_lock<std::mutex> lock(interruptMutex);
634-
interruptCv.wait(lock, [&]() { return interruptConnected; });
646+
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);
647+
{
648+
std::unique_lock<std::mutex> lock(interruptMutex);
649+
interruptCv.wait(lock, [&]() { return interruptConnectionAttemptComplete; });
650+
continueConnecting = !interruptConnected;
651+
}
635652
}
636653

637654
// wait for message received callback - meaning the will was sent

tests/Mqtt5ClientTest.cpp

Lines changed: 26 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1974,16 +1974,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
19741974
const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID;
19751975
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID;
19761976

1977-
const int MESSAGE_NUMBER = 10;
1978-
std::atomic<int> client_messages(0);
1979-
bool client1_received = false;
1980-
bool client2_received = false;
1981-
1982-
std::vector<int> receivedMessages(MESSAGE_NUMBER);
1983-
for (int i = 0; i < MESSAGE_NUMBER; i++)
1984-
{
1985-
receivedMessages.push_back(0);
1986-
}
1977+
static const int MESSAGE_COUNT = 10;
1978+
std::mutex receivedMessagesLock;
1979+
std::condition_variable receivedMessagesSignal;
1980+
std::vector<int> receivedMessages;
19871981

19881982
Aws::Iot::Mqtt5ClientBuilder *subscribe_builder =
19891983
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
@@ -1993,39 +1987,29 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
19931987
allocator);
19941988
ASSERT_TRUE(subscribe_builder);
19951989

1996-
std::promise<void> client_received;
1997-
1998-
auto get_on_message_callback = [&](bool &received)
1990+
auto on_message_callback1 = [&](const PublishReceivedEventData &eventData)
19991991
{
2000-
return [&](const PublishReceivedEventData &eventData) -> int
1992+
String topic = eventData.publishPacket->getTopic();
1993+
if (topic == TEST_TOPIC)
20011994
{
2002-
String topic = eventData.publishPacket->getTopic();
2003-
if (topic == TEST_TOPIC)
1995+
ByteCursor payload = eventData.publishPacket->getPayload();
1996+
String message_string = String((const char *)payload.ptr, payload.len);
1997+
int message_int = atoi(message_string.c_str());
1998+
20041999
{
2005-
ByteCursor payload = eventData.publishPacket->getPayload();
2006-
String message_string = String((const char *)payload.ptr, payload.len);
2007-
int message_int = atoi(message_string.c_str());
2008-
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
2009-
++receivedMessages[message_int];
2010-
received = true; // this line has changed
2011-
2012-
bool exchanged = false;
2013-
int desired = 11;
2014-
int tested = 10;
2015-
client_messages++;
2016-
exchanged = client_messages.compare_exchange_strong(tested, desired);
2017-
if (exchanged == true)
2000+
std::lock_guard<std::mutex> guard(receivedMessagesLock);
2001+
receivedMessages.push_back(message_int);
2002+
2003+
if (receivedMessages.size() == MESSAGE_COUNT)
20182004
{
2019-
client_received.set_value();
2005+
receivedMessagesSignal.notify_all();
20202006
}
20212007
}
2022-
return 0;
2023-
};
2008+
}
20242009
};
2025-
auto onMessage_client1 = get_on_message_callback(client1_received);
2026-
auto onMessage_client2 = get_on_message_callback(client2_received);
2010+
auto on_message_callback2 = on_message_callback1;
20272011

2028-
subscribe_builder->WithPublishReceivedCallback(onMessage_client1);
2012+
subscribe_builder->WithPublishReceivedCallback(on_message_callback1);
20292013

20302014
Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 =
20312015
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
@@ -2034,8 +2018,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
20342018
mqtt5TestVars.m_private_key_path_string.c_str(),
20352019
allocator);
20362020
ASSERT_TRUE(subscribe_builder2);
2037-
2038-
subscribe_builder2->WithPublishReceivedCallback(onMessage_client2);
2021+
subscribe_builder2->WithPublishReceivedCallback(on_message_callback2);
20392022

20402023
Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
20412024
mqtt5TestVars.m_hostname_string,
@@ -2113,7 +2096,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
21132096
suback.get_future().wait();
21142097

21152098
/* Publish message 10 to test topic */
2116-
for (int i = 0; i < MESSAGE_NUMBER; i++)
2099+
for (int i = 0; i < MESSAGE_COUNT; i++)
21172100
{
21182101
std::string payload = std::to_string(i);
21192102
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
@@ -2122,32 +2105,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
21222105
}
21232106

21242107
/* Wait for all packets to be received on both clients */
2125-
client_received.get_future().wait();
2126-
2127-
/* Unsubscribe from the topic from both clients*/
2128-
Vector<String> unsubList;
2129-
unsubList.push_back(TEST_TOPIC);
2130-
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 =
2131-
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
2132-
unsubscribe_client1->WithTopicFilters(unsubList);
2133-
ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1));
2134-
2135-
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 =
2136-
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
2137-
unsubscribe_client2->WithTopicFilters(unsubList);
2138-
ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2));
2139-
2140-
/* make sure all messages are received */
2141-
ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */
2142-
2143-
/* makes sure both clients received at least one message */
2144-
ASSERT_TRUE(client1_received);
2145-
ASSERT_TRUE(client2_received);
2108+
std::unique_lock<std::mutex> receivedLock(receivedMessagesLock);
2109+
receivedMessagesSignal.wait(receivedLock, [&]() { return receivedMessages.size() == MESSAGE_COUNT; });
21462110

2111+
std::sort(receivedMessages.begin(), receivedMessages.end());
21472112
/* make sure all messages are received with no duplicates*/
2148-
for (int i = 0; i < MESSAGE_NUMBER; i++)
2113+
for (int i = 0; i < MESSAGE_COUNT; i++)
21492114
{
2150-
ASSERT_TRUE(receivedMessages[i] > 0);
2115+
ASSERT_INT_EQUALS(i, receivedMessages[i]);
21512116
}
21522117
/* Stop all clients */
21532118
ASSERT_TRUE(mqtt5Client->Stop());

tests/MqttRequestResponse.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ static TestContext s_CreateClient(
287287
Aws::Crt::Io::TlsContext tlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT, allocator);
288288

289289
Aws::Crt::Io::SocketOptions socketOptions;
290-
socketOptions.SetConnectTimeoutMs(3000);
290+
socketOptions.SetConnectTimeoutMs(10000);
291291

292292
Aws::Crt::Mqtt::MqttClient client;
293293
context.protocolClient311 =

0 commit comments

Comments
 (0)