Skip to content

Commit 30aafe9

Browse files
Add Subscribe and Unsubscribe test (#61)
* Add Subscribe and Unsubscribe test * Fix formatting * Fix indentation
1 parent b5f0df5 commit 30aafe9

File tree

1 file changed

+184
-0
lines changed

1 file changed

+184
-0
lines changed

src/mqtt/mqtt_test.c

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,26 @@
9191
*/
9292
#define TEST_MQTT_TOPIC MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/test"
9393

94+
/**
95+
* @brief Sample topic filter 2 to use in tests.
96+
*/
97+
#define TEST_MQTT_TOPIC_2 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/test2"
98+
99+
/**
100+
* @brief Sample topic filter 3 to use in tests.
101+
*/
102+
#define TEST_MQTT_TOPIC_3 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testTopic3"
103+
104+
/**
105+
* @brief Sample topic filter 4 to use in tests.
106+
*/
107+
#define TEST_MQTT_TOPIC_4 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testFour"
108+
109+
/**
110+
* @brief Sample topic filter 5 to use in tests.
111+
*/
112+
#define TEST_MQTT_TOPIC_5 MQTT_TEST_CLIENT_IDENTIFIER "/iot/integration/testTopicName5"
113+
94114
/**
95115
* @brief Sample topic filter to test MQTT retainted message.
96116
*/
@@ -1593,6 +1613,169 @@ TEST( MqttTest, MQTT_Publish_With_Retain_Flag )
15931613

15941614
/*-----------------------------------------------------------*/
15951615

1616+
/**
1617+
* @brief Tests Subscribe and Unsubscribe operations to multiple topic filters
1618+
* in a single API call.
1619+
* The test subscribes to 5 topics, and then publishes to the same topics one
1620+
* at a time. The broker is expected to route the publish message back to the
1621+
* test for all topics.
1622+
* The test then unsubscribes from the 5 topics which should also succeed.
1623+
*/
1624+
TEST( MqttTest, MQTT_SubUnsub_Multiple_Topics )
1625+
1626+
{
1627+
MQTTSubscribeInfo_t subscribeParams[ 5 ];
1628+
char * topicList[ 5 ];
1629+
size_t i;
1630+
const size_t topicCount = 5U;
1631+
MQTTQoS_t qos;
1632+
MQTTStatus_t xMQTTStatus;
1633+
uint32_t entryTime;
1634+
1635+
topicList[ 0 ] = TEST_MQTT_TOPIC;
1636+
topicList[ 1 ] = TEST_MQTT_TOPIC_2;
1637+
topicList[ 2 ] = TEST_MQTT_TOPIC_3;
1638+
topicList[ 3 ] = TEST_MQTT_TOPIC_4;
1639+
topicList[ 4 ] = TEST_MQTT_TOPIC_5;
1640+
1641+
for( i = 0; i < topicCount; i++ )
1642+
{
1643+
subscribeParams[ i ].pTopicFilter = topicList[ i ];
1644+
subscribeParams[ i ].topicFilterLength = strlen( topicList[ i ] );
1645+
subscribeParams[ i ].qos = ( i % 2 );
1646+
}
1647+
1648+
globalSubscribePacketIdentifier = MQTT_GetPacketId( &context );
1649+
/* Check that the packet ID is valid according to the MQTT spec. */
1650+
TEST_ASSERT_NOT_EQUAL( MQTT_PACKET_ID_INVALID, globalSubscribePacketIdentifier );
1651+
TEST_ASSERT_NOT_EQUAL( 0U, globalSubscribePacketIdentifier );
1652+
1653+
/* Subscribe to all topics. */
1654+
TEST_ASSERT_EQUAL( MQTTSuccess, MQTT_Subscribe( &context,
1655+
subscribeParams,
1656+
topicCount,
1657+
globalSubscribePacketIdentifier ) );
1658+
1659+
/* Expect a SUBACK from the broker for the subscribe operation. */
1660+
TEST_ASSERT_FALSE( receivedSubAck );
1661+
entryTime = FRTest_GetTimeMs();
1662+
do
1663+
{
1664+
xMQTTStatus = MQTT_ProcessLoop( &context );
1665+
1666+
if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) )
1667+
{
1668+
/* Timeout. */
1669+
break;
1670+
}
1671+
else if( receivedSubAck != 0 )
1672+
{
1673+
/* No need to loop anymore since we received the SUBACK. */
1674+
break;
1675+
}
1676+
else
1677+
{
1678+
/* Do nothing. */
1679+
}
1680+
}while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
1681+
1682+
TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
1683+
TEST_ASSERT_TRUE( receivedSubAck );
1684+
1685+
/* Publish to the same topic, that we subscribed to. */
1686+
for( i = 0; i < topicCount; i++ )
1687+
{
1688+
/* Set Qos to be either 1 or 0. */
1689+
qos = ( i % 2 );
1690+
1691+
TEST_ASSERT_EQUAL( MQTTSuccess, publishToTopic(
1692+
&context,
1693+
topicList[ i ],
1694+
false, /* setRetainFlag */
1695+
false, /* isDuplicate */
1696+
qos, /* QoS */
1697+
MQTT_GetPacketId( &context ) ) );
1698+
1699+
/* Reset the PUBACK flag. */
1700+
receivedPubAck = false;
1701+
1702+
configPRINTF( ( "%u Entered1", xTaskGetTickCount() ) );
1703+
entryTime = FRTest_GetTimeMs();
1704+
do
1705+
{
1706+
xMQTTStatus = MQTT_ProcessLoop( &context );
1707+
1708+
if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) )
1709+
{
1710+
/* Timeout. */
1711+
break;
1712+
}
1713+
else
1714+
{
1715+
/* Do nothing. */
1716+
}
1717+
}while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
1718+
1719+
TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
1720+
1721+
/* Only wait for PUBACK if QoS is not QoS0. */
1722+
if( qos != MQTTQoS0 )
1723+
{
1724+
/* Make sure we have received PUBACK response. */
1725+
TEST_ASSERT_TRUE( receivedPubAck );
1726+
}
1727+
1728+
/* Make sure that we have received the same message from the server,
1729+
* that was published (as we have subscribed to the same topic). */
1730+
TEST_ASSERT_EQUAL( qos, incomingInfo.qos );
1731+
TEST_ASSERT_EQUAL( strlen( topicList[ i ] ), incomingInfo.topicNameLength );
1732+
TEST_ASSERT_EQUAL_MEMORY( topicList[ i ],
1733+
incomingInfo.pTopicName,
1734+
strlen( topicList[ i ] ) );
1735+
TEST_ASSERT_EQUAL( strlen( MQTT_EXAMPLE_MESSAGE ), incomingInfo.payloadLength );
1736+
TEST_ASSERT_EQUAL_MEMORY( MQTT_EXAMPLE_MESSAGE,
1737+
incomingInfo.pPayload,
1738+
incomingInfo.payloadLength );
1739+
}
1740+
1741+
globalUnsubscribePacketIdentifier = MQTT_GetPacketId( &context );
1742+
/* Check that the packet ID is valid according to the MQTT spec. */
1743+
TEST_ASSERT_NOT_EQUAL( MQTT_PACKET_ID_INVALID, globalUnsubscribePacketIdentifier );
1744+
TEST_ASSERT_NOT_EQUAL( 0U, globalUnsubscribePacketIdentifier );
1745+
1746+
/* Un-subscribe from all the topics. */
1747+
TEST_ASSERT_EQUAL( MQTTSuccess, MQTT_Unsubscribe(
1748+
&context, subscribeParams, topicCount, globalUnsubscribePacketIdentifier ) );
1749+
1750+
receivedUnsubAck = false;
1751+
1752+
/* Expect an UNSUBACK from the broker for the unsubscribe operation. */
1753+
entryTime = FRTest_GetTimeMs();
1754+
do
1755+
{
1756+
xMQTTStatus = MQTT_ProcessLoop( &context );
1757+
1758+
if( FRTest_GetTimeMs() > ( entryTime + MQTT_PROCESS_LOOP_TIMEOUT_MS ) )
1759+
{
1760+
/* Timeout. */
1761+
break;
1762+
}
1763+
else if( receivedUnsubAck != 0 )
1764+
{
1765+
break;
1766+
}
1767+
else
1768+
{
1769+
/* Do nothing. */
1770+
}
1771+
}while( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
1772+
1773+
TEST_ASSERT_TRUE( ( xMQTTStatus == MQTTSuccess ) || ( xMQTTStatus == MQTTNeedMoreBytes ) );
1774+
TEST_ASSERT_TRUE( receivedUnsubAck );
1775+
}
1776+
1777+
/*-----------------------------------------------------------*/
1778+
15961779
/**
15971780
* @brief Test group runner for MQTT test against MQTT broker.
15981781
*/
@@ -1605,6 +1788,7 @@ TEST_GROUP_RUNNER( MqttTest )
16051788
RUN_TEST_CASE( MqttTest, MQTT_Resend_Unacked_Publish_QoS1 );
16061789
RUN_TEST_CASE( MqttTest, MQTT_Restore_Session_Duplicate_Incoming_Publish_Qos1 );
16071790
RUN_TEST_CASE( MqttTest, MQTT_Publish_With_Retain_Flag );
1791+
RUN_TEST_CASE( MqttTest, MQTT_SubUnsub_Multiple_Topics );
16081792
}
16091793

16101794
/*-----------------------------------------------------------*/

0 commit comments

Comments
 (0)