1111
1212#include < algorithm>
1313#include < aws/crt/UUID.h>
14+ #include < chrono>
1415#include < condition_variable>
1516#include < iostream>
1617#include < mutex>
@@ -23,7 +24,8 @@ static void s_printHelp()
2324 fprintf (
2425 stdout,
2526 " basic-pub-sub --endpoint <endpoint> --cert <path to cert>"
26- " --key <path to key> --topic <topic> --ca_file <optional: path to custom ca>"
27+ " --key <path to key> --topic <topic> --message <message> --count <count>"
28+ " --client_id <client id> --ca_file <optional: path to custom ca>"
2729 " --use_websocket --signing_region <region> --proxy_host <host> --proxy_port <port>"
2830 " --x509 --x509_role_alias <role_alias> --x509_endpoint <endpoint> --x509_thing <thing_name>"
2931 " --x509_cert <path to cert> --x509_key <path to key> --x509_rootca <path to root ca>\n\n " );
@@ -33,6 +35,8 @@ static void s_printHelp()
3335 " cert: path to your client certificate in PEM format. If this is not set you must specify use_websocket\n " );
3436 fprintf (stdout, " key: path to your key in PEM format. If this is not set you must specify use_websocket\n " );
3537 fprintf (stdout, " topic: topic to publish, subscribe to. (optional)\n " );
38+ fprintf (stdout, " message: override payload of published messages. (optional, defaults to \" Hello world!\" ))\n " );
39+ fprintf (stdout, " count: number of messages to publish. (optional, defaults to 10)\n " );
3640 fprintf (stdout, " client_id: client id to use (optional)\n " );
3741 fprintf (
3842 stdout,
@@ -106,6 +110,9 @@ int main(int argc, char *argv[])
106110 bool useWebSocket = false ;
107111 bool useX509 = false ;
108112
113+ uint32_t messageCount = 10 ;
114+ String messagePayload (" Hello world!" );
115+
109116 /* ********************** Parse Arguments ***************************/
110117 if (!s_cmdOptionExists (argv, argv + argc, " --endpoint" ))
111118 {
@@ -246,6 +253,20 @@ int main(int argc, char *argv[])
246253 useX509 = true ;
247254 }
248255
256+ if (s_cmdOptionExists (argv, argv + argc, " --message" ))
257+ {
258+ messagePayload = s_getCmdOption (argv, argv + argc, " --message" );
259+ }
260+
261+ if (s_cmdOptionExists (argv, argv + argc, " --count" ))
262+ {
263+ int count = atoi (s_getCmdOption (argv, argv + argc, " --count" ));
264+ if (count > 0 )
265+ {
266+ messageCount = count;
267+ }
268+ }
269+
249270 /* ********************* Now Setup an Mqtt Client ******************/
250271 /*
251272 * You need an event loop group to process IO events.
@@ -458,17 +479,6 @@ int main(int argc, char *argv[])
458479 connection->OnConnectionInterrupted = std::move (onInterrupted);
459480 connection->OnConnectionResumed = std::move (onResumed);
460481
461- connection->SetOnMessageHandler ([](Mqtt::MqttConnection &,
462- const String &topic,
463- const ByteBuf &payload,
464- bool /* dup*/ ,
465- Mqtt::QOS /* qos*/ ,
466- bool /* retain*/ ) {
467- fprintf (stdout, " Generic Publish received on topic %s, payload:\n " , topic.c_str ());
468- fwrite (payload.buffer , 1 , payload.len , stdout);
469- fprintf (stdout, " \n " );
470- });
471-
472482 /*
473483 * Actually perform the connect dance.
474484 * This will use default ping behavior of 1 hour and 3 second timeouts.
@@ -483,6 +493,10 @@ int main(int argc, char *argv[])
483493
484494 if (connectionCompletedPromise.get_future ().get ())
485495 {
496+ std::mutex receiveMutex;
497+ std::condition_variable receiveSignal;
498+ uint32_t receivedCount = 0 ;
499+
486500 /*
487501 * This is invoked upon the receipt of a Publish on a subscribed topic.
488502 */
@@ -492,10 +506,16 @@ int main(int argc, char *argv[])
492506 bool /* dup*/ ,
493507 Mqtt::QOS /* qos*/ ,
494508 bool /* retain*/ ) {
495- fprintf (stdout, " Publish received on topic %s\n " , topic.c_str ());
496- fprintf (stdout, " \n Message:\n " );
497- fwrite (byteBuf.buffer , 1 , byteBuf.len , stdout);
498- fprintf (stdout, " \n " );
509+ {
510+ std::lock_guard<std::mutex> lock (receiveMutex);
511+ ++receivedCount;
512+ fprintf (stdout, " Publish #%d received on topic %s\n " , receivedCount, topic.c_str ());
513+ fprintf (stdout, " Message: " );
514+ fwrite (byteBuf.buffer , 1 , byteBuf.len , stdout);
515+ fprintf (stdout, " \n " );
516+ }
517+
518+ receiveSignal.notify_all ();
499519 };
500520
501521 /*
@@ -527,34 +547,21 @@ int main(int argc, char *argv[])
527547 connection->Subscribe (topic.c_str (), AWS_MQTT_QOS_AT_LEAST_ONCE, onMessage, onSubAck);
528548 subscribeFinishedPromise.get_future ().wait ();
529549
530- while (true )
550+ uint32_t publishedCount = 0 ;
551+ while (publishedCount < messageCount)
531552 {
532- String input;
533- fprintf (
534- stdout,
535- " Enter the message you want to publish to topic %s and press enter. Enter 'exit' to exit this "
536- " program.\n " ,
537- topic.c_str ());
538- std::getline (std::cin, input);
539-
540- if (input == " exit" )
541- {
542- break ;
543- }
544-
545- ByteBuf payload = ByteBufFromArray ((const uint8_t *)input.data (), input.length ());
553+ ByteBuf payload = ByteBufFromArray ((const uint8_t *)messagePayload.data (), messagePayload.length ());
546554
547- auto onPublishComplete = [](Mqtt::MqttConnection &, uint16_t packetId, int errorCode) {
548- if (packetId)
549- {
550- fprintf (stdout, " Operation on packetId %d Succeeded\n " , packetId);
551- }
552- else
553- {
554- fprintf (stdout, " Operation failed with error %s\n " , aws_error_debug_str (errorCode));
555- }
556- };
555+ auto onPublishComplete = [](Mqtt::MqttConnection &, uint16_t , int ) {};
557556 connection->Publish (topic.c_str (), AWS_MQTT_QOS_AT_LEAST_ONCE, false , payload, onPublishComplete);
557+ ++publishedCount;
558+
559+ std::this_thread::sleep_for (std::chrono::milliseconds (1000 ));
560+ }
561+
562+ {
563+ std::unique_lock<std::mutex> receivedLock (receiveMutex);
564+ receiveSignal.wait (receivedLock, [&] { return receivedCount >= messageCount; });
558565 }
559566
560567 /*
@@ -564,12 +571,17 @@ int main(int argc, char *argv[])
564571 connection->Unsubscribe (
565572 topic.c_str (), [&](Mqtt::MqttConnection &, uint16_t , int ) { unsubscribeFinishedPromise.set_value (); });
566573 unsubscribeFinishedPromise.get_future ().wait ();
567- }
568574
569- /* Disconnect */
570- if (connection->Disconnect ())
575+ /* Disconnect */
576+ if (connection->Disconnect ())
577+ {
578+ connectionClosedPromise.get_future ().wait ();
579+ }
580+ }
581+ else
571582 {
572- connectionClosedPromise. get_future (). wait ( );
583+ exit (- 1 );
573584 }
585+
574586 return 0 ;
575587}
0 commit comments