-
Notifications
You must be signed in to change notification settings - Fork 9
Enhancements to the pub sub client reconnect unsubscribe publish with time out #128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…unit tests for the new function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi moamenvx,
thx for your 1st contribution. Looks meaningful.
However, there are some findings to be fixed.
Regards, Björn
sdk/include/sdk/AsyncResult.h
Outdated
/** | ||
* @brief Status of an MQTT publish operation | ||
*/ | ||
enum PublishStatus { | ||
Success, // Message was published successfully | ||
Timeout, // Publish operation timed out | ||
Failure // Publish operation failed (e.g., exception thrown) | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used here. Should be moved to IPubSubClient.h
/** | ||
* @brief Reconnect the client to the broker. | ||
* @param timeout_ms maximum time to wait for the reconnection attempt to complete, in | ||
* milliseconds. | ||
*/ | ||
virtual void reconnect(int timeout_ms) = 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be visible to the app "user" code: Instead, reconnection should be done "under the hood" within the PubSubClient after calling connect()
and until calling disconnect()
.
Would be cool if you could achieve that. There is also longer on our minds, that currently the app is terminating if there is no mqtt connection available at startup. Better behaviour would be, that the app is just silently waiting and connecting to the mqtt broker once it gets available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You wrote back:
When testing the original component, I ran into a problem where, after shutting down the connection and bringing it back up, the client didn’t detect the restored connection and remained down. So, I needed a way to explicitly attempt a reconnect.
Ok, but how do you detect that situation (to trigger the reconnect)? I mean, an (maybe naive) idea would be to detect it during a publish call, e.g. because it's failing an then try to trigger the reconnect. But of course this wouldn't help with previously established subscriptions because the lacking notifications cannot be detected ...
My problem here is, that I'd like to avoid inserting this function, because removing it later (when finding a "better" solution) would be an incompatible change.
NOTICE-3RD-PARTY-CONTENT.md
Outdated
<<<<<<< HEAD | ||
|lxml|5.3.2|New BSD| | ||
======= | ||
|lxml|5.3.1|New BSD| | ||
>>>>>>> ad99c59d1940332b4c003f40072511903b40cacd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this merge conflict
You can get the required content just from the output of the failing workflow. There is a section where you can copy the content from. Just past it over the existing content of this file. Make sure there is exactly one single FF after the last license entry!
* | ||
* @param topic the MQTT topic to publish the message to | ||
* @param data the payload to send as the message | ||
* @param timeout_ms maximum time to wait for the publish to complete, in milliseconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the corner cases: What does a timeout of 0 ms mean? What about negative numbers?
Hello BjoernAtBosch,
Thank you for the thorough review.
I will try to clarify each of the review points:
1. The enum was created in AsyncResult.h rather than in IPubSubClient.h because IPubSubClient.h is not included in VehicleApp.h, although the IPubSubClient class is forward declared there. Initially, I created the enum class as part of the IPubSubClient class, but the compiler failed to build because it couldn't access the internal members of a forward-declared class.
I didn't want to include IPubSubClient.h in VehicleApp.h since it hadn't been included before, so placing it in AsyncResult.h was the only option. If including IPubSubClient.h in VehicleApp.h is acceptable, please let me know.
2. When testing the original component, I ran into a problem where, after shutting down the connection and bringing it back up, the client didn’t detect the restored connection and remained down. So, I needed a way to explicitly attempt a reconnect.
3. 4. Ok. I will work on these points.
5. I was using the waitFor() method at first, but I found that if the topic name is incorrect (e.g., contains a character like $ or #), the publish function gets stuck and doesn't return the token at all. As a result, I couldn't wait for a timeout using the token. The purpose of using a future was to enforce a timeout even when publish gets stuck. I didn’t consider using the callback in this case because I needed blocking behavior, not a fully async one.
________________________________
From: BjoernAtBosch ***@***.***>
Sent: Thursday, April 10, 2025 3:14 PM
To: eclipse-velocitas/vehicle-app-cpp-sdk ***@***.***>
Cc: Moamen Abdellatef ***@***.***>; Author ***@***.***>
Subject: Re: [eclipse-velocitas/vehicle-app-cpp-sdk] Enhancements to the pub sub client reconnect unsubscribe publish with time out (PR #128)
@BjoernAtBosch requested changes on this pull request.
Hi moamenvx,
thx for your 1st contribution. Looks meaningful.
However, there are some findings to be fixed.
Regards, Björn
________________________________
In sdk/include/sdk/AsyncResult.h<#128 (comment)>:
+/**
+ * @brief Status of an MQTT publish operation
+ */
+enum PublishStatus {
+ Success, // Message was published successfully
+ Timeout, // Publish operation timed out
+ Failure // Publish operation failed (e.g., exception thrown)
+};
+
Not used here. Should be moved to IPubSubClient.h
________________________________
In sdk/include/sdk/IPubSubClient.h<#128 (comment)>:
+ /**
+ * @brief Reconnect the client to the broker.
+ * @param timeout_ms maximum time to wait for the reconnection attempt to complete, in
+ * milliseconds.
+ */
+ virtual void reconnect(int timeout_ms) = 0;
+
This should not be visible to the app "user" code: Instead, reconnection should be done "under the hood" within the PubSubClient after calling connect() and until calling disconnect().
Would be cool if you could achieve that. There is also longer on our minds, that currently the app is terminating if there is no mqtt connection available at startup. Better behaviour would be, that the app is just silently waiting and connecting to the mqtt broker once it gets available.
________________________________
In NOTICE-3RD-PARTY-CONTENT.md<#128 (comment)>:
+<<<<<<< HEAD
+|lxml|5.3.2|New BSD|
+=======
|lxml|5.3.1|New BSD|
+>>>>>>> ad99c59
Please fix this merge conflict
You can get the required content just from the output of the failing workflow. There is a section where you can copy the content from. Just past it over the existing content of this file. Make sure there is exactly one single FF after the last license entry!
________________________________
In sdk/include/sdk/IPubSubClient.h<#128 (comment)>:
@@ -126,6 +133,20 @@ class IPubSubClient {
*/
virtual void publishOnTopic(const std::string& topic, const std::string& data) = 0;
+ /**
+ * @brief Publishes a message to the specified MQTT topic with a timeout in milliseconds for the
+ * publish to complete. Returns a status indicating whether the publish was successful, timed
+ * out, or failed.
+ *
+ * @param topic the MQTT topic to publish the message to
+ * @param data the payload to send as the message
+ * @param timeout_ms maximum time to wait for the publish to complete, in milliseconds
Please document the corner cases: What does a timeout of 0 ms mean? What about negative numbers?
________________________________
In sdk/src/sdk/pubsub/MqttPubSubClient.cpp<#128 (comment)>:
@@ -93,6 +109,36 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
m_client.publish(topic, data)->wait();
}
+ PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
+ int timeout_ms) override {
+ try {
I know the existing function is block-waiting, but for this new function it could make sense to return a future or a AsyncResult to let the user decide if they like to do blocking wait or will use Callbacks for async notification (or likes to ignore outcome at all).
In case of keeping the blocking variant, you should think about using Paho mqtt's waitFor() ...
—
Reply to this email directly, view it on GitHub<#128 (review)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/BOVRBKY6WHOJO3XUI5R7JD32YZVCHAVCNFSM6AAAAAB224SEDSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDONJVHA2DGOBRHE>.
You are receiving this because you authored the thread.Message ID: ***@***.***>
**Warning: External Sender**
This email originated from an external sender and did not originate from "VxLabs". Exercise caution when interacting with the contents of this message. Be wary of clicking on links, downloading attachments, or providing sensitive information in response to this email. If you suspect this message to be phishing or fraudulent, please report it to your IT security team immediately.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi moamenvx,
please revert your changes regarding indentation and include order. This is handled by our tooling.
Furthermore, it makes it hard to understand the real code changes. (In general, those kind of changes should not be intermixed with real behavioral changes.)
Will continue reviewing once those changes are reverted.
I quickly fixed the licenses again, sorry for the annoying workflow, lets see if we can improve this in the future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, for this long time of silence, but we were busy with releasing the Conan 2 migration (and other things), which is now available.
After rebasing your branch you're "on Conan 2" then. This might require to rebuild you're devcontainer.
If you already have an app repo basing on our app template, you'll also need to migrate this to Conan 2. We've put a migration guide at the bottom of the template's readme.
/** | ||
* @brief Status of a publish operation | ||
*/ | ||
enum PublishStatus { | ||
Success, // Message was published successfully | ||
Timeout, // Publish operation timed out | ||
Failure // Publish operation failed (e.g., exception thrown) | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please move this to IPubSuClient.h
and include that header in VehicleApp.h
auto range = m_subscriberMap.equal_range(topic); | ||
m_subscriberMap.erase(range.first, range.second); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See finding above
void unsubscribeTopic(const AsyncSubscriptionPtr_t<std::string>& subscription) override { | |
for (auto it = m_subscriberMap.begin(); it != m_subscriberMap.end(); ++it) { | |
if (it->second == subscription) { | |
auto topic = it->first; | |
m_subscriberMap.erase(it); | |
if (m_subscriberMap.count(topic) == 0) { | |
m_client.unsubscribe(topic)->wait(); | |
} | |
return; | |
} | |
} | |
} |
* @param topic The topic to unsubscribe from. | ||
*/ | ||
virtual void unsubscribeTopic(const std::string& topic) = 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be different subscriptions to the same topic. Therefore we should also offer the option to recall just a specific subscription:
/** | |
* @brief Recall a specific subscription | |
* | |
* @param subscription The subscription to recall; this is an object earlier returned by subscibeTopic. | |
*/ | |
virtual void unsubscribeTopic(const AsyncSubscriptionPtr_t<std::string>& subscription = {}) = 0; |
* Failure | ||
*/ | ||
virtual PublishStatus publishOnTopic(const std::string& topic, const std::string& data, | ||
int timeout_ms) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer using some unsigned int to make clear that negative numbers are not useful.
int timeout_ms) = 0; | |
unsigned int timeout_ms) = 0; |
* Success, Timeout, or Failure | ||
*/ | ||
virtual PublishStatus publishOnTopic(const std::string& topic, const std::string& data, | ||
int timeout_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int timeout_ms); | |
unsigned int timeout_ms); |
/** | ||
* @brief Reconnect the client to the broker. | ||
* @param timeout_ms maximum time to wait for the reconnection attempt to complete, in | ||
* milliseconds. | ||
*/ | ||
virtual void reconnect(int timeout_ms) = 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You wrote back:
When testing the original component, I ran into a problem where, after shutting down the connection and bringing it back up, the client didn’t detect the restored connection and remained down. So, I needed a way to explicitly attempt a reconnect.
Ok, but how do you detect that situation (to trigger the reconnect)? I mean, an (maybe naive) idea would be to detect it during a publish call, e.g. because it's failing an then try to trigger the reconnect. But of course this wouldn't help with previously established subscriptions because the lacking notifications cannot be detected ...
My problem here is, that I'd like to avoid inserting this function, because removing it later (when finding a "better" solution) would be an incompatible change.
if (timeout_ms > MAX_TIMEOUT_MS) { | ||
logger().warn("Timeout capped to {} ms (requested: {} ms)", | ||
MAX_TIMEOUT_MS, timeout_ms); | ||
timeout_ms = MAX_TIMEOUT_MS; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know if this limitation is required. I would leave it to the user to decide about this.
if (timeout_ms > MAX_TIMEOUT_MS) { | ||
logger().warn("Timeout capped to {} ms (requested: {} ms)", | ||
MAX_TIMEOUT_MS, timeout_ms); | ||
timeout_ms = MAX_TIMEOUT_MS; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above
0d1ea52
to
087f193
Compare
Describe your changes
Issue ticket number and link
Checklist - Manual tasks
Examples are executing successfully
Created/updated unit tests. Code Coverage percentage on new code shall be >= 80%.
Created/updated integration tests.
Devcontainer can be opened successfully
Devcontainer can be opened successfully behind a corporate proxy
Devcontainer can be re-built successfully
Extended the documentation (e.g. README.md, CONTRIBUTING.md, Velocitas Docs)