Skip to content

Enhancements to the pub sub client reconnect unsubscribe publish with time out #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"files.associations": {
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"cctype": "cpp",
"chrono": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"compare": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"csignal": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"list": "cpp",
"map": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"exception": "cpp",
"functional": "cpp",
"initializer_list": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"memory": "cpp",
"mutex": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"ratio": "cpp",
"semaphore": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"thread": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"utility": "cpp",
"any": "cpp",
"codecvt": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"forward_list": "cpp",
"set": "cpp",
"unordered_set": "cpp",
"algorithm": "cpp",
"iterator": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"fstream": "cpp",
"iomanip": "cpp",
"shared_mutex": "cpp",
"sstream": "cpp",
"cinttypes": "cpp",
"variant": "cpp",
"future": "cpp"
}
}
9 changes: 9 additions & 0 deletions sdk/include/sdk/AsyncResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ struct VoidResult {};

enum class CallState { ONGOING, CANCELING, COMPLETED, FAILED };

/**
* @brief Status of a publish operation
*/
enum PublishStatus {
Success, // Message was published successfully
Timeout, // Publish operation timed out
Failure // Publish operation failed (e.g., exception thrown)
};

Comment on lines +62 to +70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please move this to IPubSuClient.h and include that header in VehicleApp.h

/**
* @brief Single result of an asynchronous operation which provides
* an item of type TResultType.
Expand Down
32 changes: 30 additions & 2 deletions sdk/include/sdk/IPubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class IPubSubClient {
*
* @param brokerUri address of the MQTT broker to connect to
* @param clientId used to identify the client at the MQTT broker
* @param username username to get access to the MQTT broker
* @param password password to get access to the MQTT broker
* @param username to get access to the MQTT broker
* @param password to get access to the MQTT broker
* @return std::shared_ptr<IPubSubClient> reference to the created MQTT client
*/
static std::shared_ptr<IPubSubClient> createInstance(const std::string& brokerUri,
Expand Down Expand Up @@ -104,6 +104,13 @@ class IPubSubClient {
*/
virtual void connect() = 0;

/**
* @brief Reconnect the client to the broker.
* @param timeout_ms maximum time to wait for the reconnection attempt to complete, in
* milliseconds.
*/
virtual void reconnect(int timeout_ms) = 0;

Comment on lines +107 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be visible to the app "user" code: Instead, reconnection should be done "under the hood" within the PubSubClient after calling connect() and until calling disconnect().
Would be cool if you could achieve that. There is also longer on our minds, that currently the app is terminating if there is no mqtt connection available at startup. Better behaviour would be, that the app is just silently waiting and connecting to the mqtt broker once it gets available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You wrote back:

When testing the original component, I ran into a problem where, after shutting down the connection and bringing it back up, the client didn’t detect the restored connection and remained down. So, I needed a way to explicitly attempt a reconnect.

Ok, but how do you detect that situation (to trigger the reconnect)? I mean, an (maybe naive) idea would be to detect it during a publish call, e.g. because it's failing an then try to trigger the reconnect. But of course this wouldn't help with previously established subscriptions because the lacking notifications cannot be detected ...

My problem here is, that I'd like to avoid inserting this function, because removing it later (when finding a "better" solution) would be an incompatible change.

/**
* @brief Disconnect the client from the broker.
*
Expand All @@ -126,6 +133,20 @@ class IPubSubClient {
*/
virtual void publishOnTopic(const std::string& topic, const std::string& data) = 0;

/**
* @brief Publishes a message to the specified MQTT topic with a timeout in milliseconds for the
* publish to complete. Returns a status indicating whether the publish was successful, timed
* out, or failed.
*
* @param topic the MQTT topic to publish the message to
* @param data the payload to send as the message
* @param timeout_ms maximum time to wait for the publish to complete, in milliseconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the corner cases: What does a timeout of 0 ms mean? What about negative numbers?

* @return PublishStatus indicating the result of the publish operation: Success, Timeout,
* Failure
*/
virtual PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer using some unsigned int to make clear that negative numbers are not useful.

Suggested change
int timeout_ms) = 0;
unsigned int timeout_ms) = 0;


/**
* @brief Subscribe to a topic.
*
Expand All @@ -134,6 +155,13 @@ class IPubSubClient {
*/
virtual AsyncSubscriptionPtr_t<std::string> subscribeTopic(const std::string& topic) = 0;

/**
* @brief Unsubscribe from a topic.
*
* @param topic The topic to unsubscribe from.
*/
virtual void unsubscribeTopic(const std::string& topic) = 0;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be different subscriptions to the same topic. Therefore we should also offer the option to recall just a specific subscription:

Suggested change
/**
* @brief Recall a specific subscription
*
* @param subscription The subscription to recall; this is an object earlier returned by subscibeTopic.
*/
virtual void unsubscribeTopic(const AsyncSubscriptionPtr_t<std::string>& subscription = {}) = 0;

IPubSubClient(const IPubSubClient&) = delete;
IPubSubClient(IPubSubClient&&) = delete;
IPubSubClient& operator=(const IPubSubClient&) = delete;
Expand Down
33 changes: 33 additions & 0 deletions sdk/include/sdk/VehicleApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ class VehicleApp {
*/
AsyncSubscriptionPtr_t<std::string> subscribeToTopic(const std::string& topic);

/**
* @brief Unsubscribe from a topic.
*
* @param topic The topic to unsubscribe from.
*/
void unsubscribeTopic(const std::string& topic);

/**
* @brief Publish a PubSub message to the given topic.
*
Expand All @@ -96,6 +103,32 @@ class VehicleApp {
*/
void publishToTopic(const std::string& topic, const std::string& data);

/**
* @brief Publishes a message to the specified MQTT topic with a timeout (in
* milliseconds) for the operation to complete. Returns a status indicating
* whether the publish was successful, timed out, or failed.
*
* @param topic the MQTT topic to publish the message to
* @param data the payload to send as the message
* @param timeout_ms maximum time (in milliseconds) to wait for the publish to
* complete. Values ≤ 0 are treated as an instant timeout. Timeout value is
* capped at a maximum of 30000 ms.
* @return PublishStatus indicating the result of the publish operation:
* Success, Timeout, or Failure
*/
virtual PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int timeout_ms);
unsigned int timeout_ms);


/**
* @brief Attempts to reconnect to the MQTT broker within a specified timeout
* period.
*
* @param timeout_ms The maximum time to wait for reconnection, in
* milliseconds. Values ≤ 0 are treated as an error. Timeout value
* is capped at a maximum of 30000 ms.
*/
virtual void reconnect(int timeout_ms);

/**
* @brief Get values for all provided data points from the data broker.
*
Expand Down
11 changes: 11 additions & 0 deletions sdk/src/sdk/VehicleApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ AsyncSubscriptionPtr_t<std::string> VehicleApp::subscribeToTopic(const std::stri
return {};
}

void VehicleApp::unsubscribeTopic(const std::string& topic) {
return m_pubSubClient->unsubscribeTopic(topic);
}

AsyncResultPtr_t<DataPointReply>
VehicleApp::getDataPoints(const std::vector<std::reference_wrapper<DataPoint>>& dataPoints) {
std::vector<std::string> dataPointPaths;
Expand Down Expand Up @@ -113,6 +117,13 @@ void VehicleApp::publishToTopic(const std::string& topic, const std::string& dat
}
}

PublishStatus VehicleApp::publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms) {
return m_pubSubClient->publishOnTopic(topic, data, timeout_ms);
}

void VehicleApp::reconnect(int timeout_ms) { m_pubSubClient->reconnect(timeout_ms); }

std::shared_ptr<IVehicleDataBrokerClient> VehicleApp::getVehicleDataBrokerClient() {
return m_vdbClient;
}
Expand Down
93 changes: 85 additions & 8 deletions sdk/src/sdk/pubsub/MqttPubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "sdk/middleware/Middleware.h"

#include <mqtt/async_client.h>
#include <future>
#include <mqtt/connect_options.h>
#include <unordered_map>

Expand Down Expand Up @@ -60,14 +61,13 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
const std::string& privateKeyPath)
: m_client{brokerUri, clientId} {
m_client.set_callback(*this);
auto sslopts = mqtt::ssl_options_builder()
.trust_store(trustStorePath)
.key_store(keyStorePath)
.private_key(privateKeyPath)
.error_handler([](const std::string& msg) {
logger().error("SSL Error: {}", msg);
})
.finalize();
auto sslopts =
mqtt::ssl_options_builder()
.trust_store(trustStorePath)
.key_store(keyStorePath)
.private_key(privateKeyPath)
.error_handler([](const std::string& msg) { logger().error("SSL Error: {}", msg); })
.finalize();
m_connectOptions = mqtt::connect_options_builder().ssl(std::move(sslopts)).finalize();
}

Expand All @@ -85,6 +85,34 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {

m_client.connect(m_connectOptions)->wait();
}

void reconnect(int timeout_ms) override {
constexpr int MAX_TIMEOUT_MS = 30000;
logger().info("Attempting to reconnect to MQTT broker");
if (timeout_ms <= 0) {
logger().error("Invalid timeout value: {} ms. Must be positive.",
timeout_ms);
return;
}

if (timeout_ms > MAX_TIMEOUT_MS) {
logger().warn("Timeout capped to {} ms (requested: {} ms)",
MAX_TIMEOUT_MS, timeout_ms);
timeout_ms = MAX_TIMEOUT_MS;
}

Comment on lines +98 to +103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if this limitation is required. I would leave it to the user to decide about this.

try {
auto token = m_client.reconnect();
if (!token->wait_for(std::chrono::milliseconds(timeout_ms))) {
logger().error("MQTT reconnect timed out after {} ms", timeout_ms);
} else {
logger().info("Successfully reconnected to MQTT broker.");
}
} catch (const mqtt::exception& ex) {
logger().error("MQTT reconnect failed: {}", ex.what());
}
}

void disconnect() override { m_client.disconnect()->wait(); }
[[nodiscard]] bool isConnected() const override { return m_client.is_connected(); }

Expand All @@ -93,6 +121,48 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
m_client.publish(topic, data)->wait();
}

PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms) override {
constexpr int MAX_TIMEOUT_MS = 30000;
// Validate timeout range
if (timeout_ms <= 0) {
logger().warn("Invalid timeout value ({} ms); must be > 0", timeout_ms);
return PublishStatus::Timeout;
}

if (timeout_ms > MAX_TIMEOUT_MS) {
logger().warn("Timeout capped to {} ms (requested: {} ms)",
MAX_TIMEOUT_MS, timeout_ms);
timeout_ms = MAX_TIMEOUT_MS;
}
Comment on lines +133 to +137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

try {
logger().debug(R"(Publish on topic "{}": "{}")", topic, data);

auto future = std::async(std::launch::async, [this, &topic, &data]() {
auto tok = m_client.publish(topic, data);
if (!tok) {
throw mqtt::exception(MQTTASYNC_FAILURE);
}
tok->wait();
return PublishStatus::Success;
});

if (future.wait_for(std::chrono::milliseconds(timeout_ms)) ==
std::future_status::ready) {
return future.get(); // Success
} else {
logger().warn("Publish timed out after {} ms", timeout_ms);
return PublishStatus::Timeout;
}
} catch (const mqtt::exception& ex) {
logger().error("MQTT publish failed: {}", ex.what());
return PublishStatus::Failure;
} catch (const std::exception& ex) {
logger().error("Unexpected exception during publish: {}", ex.what());
return PublishStatus::Failure;
}
}

AsyncSubscriptionPtr_t<std::string> subscribeTopic(const std::string& topic) override {
logger().debug("Subscribing to {}", topic);
auto subscription = std::make_shared<AsyncSubscription<std::string>>();
Expand All @@ -101,6 +171,13 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
return subscription;
}

void unsubscribeTopic(const std::string& topic) override {
logger().debug("Unsubscribing from {}", topic);
m_client.unsubscribe(topic)->wait();
auto range = m_subscriberMap.equal_range(topic);
m_subscriberMap.erase(range.first, range.second);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See finding above

Suggested change
void unsubscribeTopic(const AsyncSubscriptionPtr_t<std::string>& subscription) override {
for (auto it = m_subscriberMap.begin(); it != m_subscriberMap.end(); ++it) {
if (it->second == subscription) {
auto topic = it->first;
m_subscriberMap.erase(it);
if (m_subscriberMap.count(topic) == 0) {
m_client.unsubscribe(topic)->wait();
}
return;
}
}
}

private:
void message_arrived(mqtt::const_message_ptr msg) override {
const std::string& topic = msg->get_topic();
Expand Down
46 changes: 46 additions & 0 deletions sdk/tests/mocks/MockIPubSubClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2022-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef MOCK_IPUBSUBCLIENT_H
#define MOCK_IPUBSUBCLIENT_H

#include "sdk/IPubSubClient.h"
#include <gmock/gmock.h>

namespace velocitas {

class MockIPubSubClient : public IPubSubClient {
public:
MOCK_METHOD(void, connect, (), (override));
MOCK_METHOD(void, disconnect, (), (override));
MOCK_METHOD(void, reconnect, (int timeout_ms), (override));
MOCK_METHOD(bool, isConnected, (), (const, override));

MOCK_METHOD(void, publishOnTopic, (const std::string& topic, const std::string& data),
(override));

MOCK_METHOD(PublishStatus, publishOnTopic,
(const std::string& topic, const std::string& data, int timeout_ms), (override));

MOCK_METHOD(AsyncSubscriptionPtr_t<std::string>, subscribeTopic, (const std::string& topic),
(override));

MOCK_METHOD(void, unsubscribeTopic, (const std::string& topic), (override));
};

} // namespace velocitas

#endif // MOCK_IPUBSUBCLIENT_H
Loading
Loading