Skip to content

Enhancements to the pub sub client reconnect unsubscribe publish with time out #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"files.associations": {
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"cctype": "cpp",
"chrono": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"compare": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"csignal": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"list": "cpp",
"map": "cpp",
"unordered_map": "cpp",
"vector": "cpp",
"exception": "cpp",
"functional": "cpp",
"initializer_list": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"memory": "cpp",
"mutex": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"ratio": "cpp",
"semaphore": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"thread": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"utility": "cpp",
"any": "cpp",
"codecvt": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"forward_list": "cpp",
"set": "cpp",
"unordered_set": "cpp",
"algorithm": "cpp",
"iterator": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"fstream": "cpp",
"iomanip": "cpp",
"shared_mutex": "cpp",
"sstream": "cpp",
"cinttypes": "cpp",
"variant": "cpp",
"future": "cpp"
}
}
4 changes: 4 additions & 0 deletions NOTICE-3RD-PARTY-CONTENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
|identify|2.6.9|MIT|
|idna|3.10|BSD|
|jinja2|3.1.6|BSD|
<<<<<<< HEAD
|lxml|5.3.2|New BSD|
=======
|lxml|5.3.1|New BSD|
>>>>>>> ad99c59d1940332b4c003f40072511903b40cacd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this merge conflict
You can get the required content just from the output of the failing workflow. There is a section where you can copy the content from. Just past it over the existing content of this file. Make sure there is exactly one single FF after the last license entry!

|MarkupSafe|3.0.2|BSD|
|node-semver|0.6.1|MIT|
|nodeenv|1.9.1|BSD|
Expand Down
9 changes: 9 additions & 0 deletions sdk/include/sdk/AsyncResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ struct VoidResult {};

enum class CallState { ONGOING, CANCELING, COMPLETED, FAILED };

/**
* @brief Status of an MQTT publish operation
*/
enum PublishStatus {
Success, // Message was published successfully
Timeout, // Publish operation timed out
Failure // Publish operation failed (e.g., exception thrown)
};

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used here. Should be moved to IPubSubClient.h

/**
* @brief Single result of an asynchronous operation which provides
* an item of type TResultType.
Expand Down
28 changes: 28 additions & 0 deletions sdk/include/sdk/IPubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ class IPubSubClient {
*/
virtual void connect() = 0;

/**
* @brief Reconnect the client to the broker.
* @param timeout_ms maximum time to wait for the reconnection attempt to complete, in
* milliseconds.
*/
virtual void reconnect(int timeout_ms) = 0;

Comment on lines +107 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be visible to the app "user" code: Instead, reconnection should be done "under the hood" within the PubSubClient after calling connect() and until calling disconnect().
Would be cool if you could achieve that. There is also longer on our minds, that currently the app is terminating if there is no mqtt connection available at startup. Better behaviour would be, that the app is just silently waiting and connecting to the mqtt broker once it gets available.

/**
* @brief Disconnect the client from the broker.
*
Expand All @@ -126,6 +133,20 @@ class IPubSubClient {
*/
virtual void publishOnTopic(const std::string& topic, const std::string& data) = 0;

/**
* @brief Publishes a message to the specified MQTT topic with a timeout in milliseconds for the
* publish to complete. Returns a status indicating whether the publish was successful, timed
* out, or failed.
*
* @param topic the MQTT topic to publish the message to
* @param data the payload to send as the message
* @param timeout_ms maximum time to wait for the publish to complete, in milliseconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the corner cases: What does a timeout of 0 ms mean? What about negative numbers?

* @return PublishStatus indicating the result of the publish operation: Success, Timeout,
* Failure
*/
virtual PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms) = 0;

/**
* @brief Subscribe to a topic.
*
Expand All @@ -134,6 +155,13 @@ class IPubSubClient {
*/
virtual AsyncSubscriptionPtr_t<std::string> subscribeTopic(const std::string& topic) = 0;

/**
* @brief Unsubscribe from a topic.
*
* @param topic The topic to unsubscribe from.
*/
virtual void unsubscribeTopic(const std::string& topic) = 0;

IPubSubClient(const IPubSubClient&) = delete;
IPubSubClient(IPubSubClient&&) = delete;
IPubSubClient& operator=(const IPubSubClient&) = delete;
Expand Down
28 changes: 28 additions & 0 deletions sdk/include/sdk/VehicleApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ class VehicleApp {
*/
AsyncSubscriptionPtr_t<std::string> subscribeToTopic(const std::string& topic);

/**
* @brief Unsubscribe from a topic.
*
* @param topic The topic to unsubscribe from.
*/
void unsubscribeTopic(const std::string& topic);

/**
* @brief Publish a PubSub message to the given topic.
*
Expand All @@ -96,6 +103,27 @@ class VehicleApp {
*/
void publishToTopic(const std::string& topic, const std::string& data);

/**
* @brief Publishes a message to the specified MQTT topic with a timeout in milliseconds for the
* publish to complete. Returns a status indicating whether the publish was successful, timed
* out, or failed.
*
* @param topic the MQTT topic to publish the message to
* @param data the payload to send as the message
* @param timeout_ms maximum time to wait for the publish to complete, in milliseconds
* @return PublishStatus indicating the result of the publish operation: Success, Timeout,
* Failure
*/
virtual PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms);

/**
* @brief Reconnect the client to the broker.
* @param timeout_ms maximum time to wait for the reconnection attempt to complete, in
* milliseconds.
*/
virtual void reconnect(int timeout_ms);

/**
* @brief Get values for all provided data points from the data broker.
*
Expand Down
11 changes: 11 additions & 0 deletions sdk/src/sdk/VehicleApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ AsyncSubscriptionPtr_t<std::string> VehicleApp::subscribeToTopic(const std::stri
return {};
}

void VehicleApp::unsubscribeTopic(const std::string& topic) {
return m_pubSubClient->unsubscribeTopic(topic);
}

AsyncResultPtr_t<DataPointReply>
VehicleApp::getDataPoints(const std::vector<std::reference_wrapper<DataPoint>>& dataPoints) {
std::vector<std::string> dataPointPaths;
Expand Down Expand Up @@ -113,6 +117,13 @@ void VehicleApp::publishToTopic(const std::string& topic, const std::string& dat
}
}

PublishStatus VehicleApp::publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms) {
return m_pubSubClient->publishOnTopic(topic, data, timeout_ms);
}

void VehicleApp::reconnect(int timeout_ms) { m_pubSubClient->reconnect(timeout_ms); }

std::shared_ptr<IVehicleDataBrokerClient> VehicleApp::getVehicleDataBrokerClient() {
return m_vdbClient;
}
Expand Down
69 changes: 61 additions & 8 deletions sdk/src/sdk/pubsub/MqttPubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "sdk/middleware/Middleware.h"

#include <mqtt/async_client.h>
#include <future>
#include <mqtt/connect_options.h>
#include <unordered_map>

Expand Down Expand Up @@ -60,14 +61,13 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
const std::string& privateKeyPath)
: m_client{brokerUri, clientId} {
m_client.set_callback(*this);
auto sslopts = mqtt::ssl_options_builder()
.trust_store(trustStorePath)
.key_store(keyStorePath)
.private_key(privateKeyPath)
.error_handler([](const std::string& msg) {
logger().error("SSL Error: {}", msg);
})
.finalize();
auto sslopts =
mqtt::ssl_options_builder()
.trust_store(trustStorePath)
.key_store(keyStorePath)
.private_key(privateKeyPath)
.error_handler([](const std::string& msg) { logger().error("SSL Error: {}", msg); })
.finalize();
m_connectOptions = mqtt::connect_options_builder().ssl(std::move(sslopts)).finalize();
}

Expand All @@ -85,6 +85,22 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {

m_client.connect(m_connectOptions)->wait();
}

void reconnect(int timeout_ms) override {
logger().info("Attempting to reconnect to MQTT broker");

try {
auto token = m_client.reconnect();
if (!token->wait_for(std::chrono::milliseconds(timeout_ms))) {
logger().error("MQTT reconnect timed out after {} ms", timeout_ms);
} else {
logger().info("Successfully reconnected to MQTT broker.");
}
} catch (const mqtt::exception& ex) {
logger().error("MQTT reconnect failed: {}", ex.what());
}
}

void disconnect() override { m_client.disconnect()->wait(); }
[[nodiscard]] bool isConnected() const override { return m_client.is_connected(); }

Expand All @@ -93,6 +109,36 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
m_client.publish(topic, data)->wait();
}

PublishStatus publishOnTopic(const std::string& topic, const std::string& data,
int timeout_ms) override {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the existing function is block-waiting, but for this new function it could make sense to return a future or a AsyncResult to let the user decide if they like to do blocking wait or will use Callbacks for async notification (or likes to ignore outcome at all).

In case of keeping the blocking variant, you should think about using Paho mqtt's waitFor() ...

logger().debug(R"(Publish on topic "{}": "{}")", topic, data);

auto future = std::async(std::launch::async, [this, &topic, &data]() {
auto tok = m_client.publish(topic, data);
if (!tok) {
throw mqtt::exception(MQTTASYNC_FAILURE);
}
tok->wait();
return PublishStatus::Success;
});

if (future.wait_for(std::chrono::milliseconds(timeout_ms)) ==
std::future_status::ready) {
return future.get(); // Success
} else {
logger().warn("Publish timed out after {} ms", timeout_ms);
return PublishStatus::Timeout;
}
} catch (const mqtt::exception& ex) {
logger().error("MQTT publish failed: {}", ex.what());
return PublishStatus::Failure;
} catch (const std::exception& ex) {
logger().error("Unexpected exception during publish: {}", ex.what());
return PublishStatus::Failure;
}
}

AsyncSubscriptionPtr_t<std::string> subscribeTopic(const std::string& topic) override {
logger().debug("Subscribing to {}", topic);
auto subscription = std::make_shared<AsyncSubscription<std::string>>();
Expand All @@ -101,6 +147,13 @@ class MqttPubSubClient : public IPubSubClient, private mqtt::callback {
return subscription;
}

void unsubscribeTopic(const std::string& topic) override {
logger().debug("Unsubscribing from {}", topic);
m_client.unsubscribe(topic)->wait();
auto range = m_subscriberMap.equal_range(topic);
m_subscriberMap.erase(range.first, range.second);
}

private:
void message_arrived(mqtt::const_message_ptr msg) override {
const std::string& topic = msg->get_topic();
Expand Down
46 changes: 46 additions & 0 deletions sdk/tests/mocks/MockIPubSubClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2022-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef MOCK_IPUBSUBCLIENT_H
#define MOCK_IPUBSUBCLIENT_H

#include "sdk/IPubSubClient.h"
#include <gmock/gmock.h>

namespace velocitas {

class MockIPubSubClient : public IPubSubClient {
public:
MOCK_METHOD(void, connect, (), (override));
MOCK_METHOD(void, disconnect, (), (override));
MOCK_METHOD(void, reconnect, (int timeout_ms), (override));
MOCK_METHOD(bool, isConnected, (), (const, override));

MOCK_METHOD(void, publishOnTopic, (const std::string& topic, const std::string& data),
(override));

MOCK_METHOD(PublishStatus, publishOnTopic,
(const std::string& topic, const std::string& data, int timeout_ms), (override));

MOCK_METHOD(AsyncSubscriptionPtr_t<std::string>, subscribeTopic, (const std::string& topic),
(override));

MOCK_METHOD(void, unsubscribeTopic, (const std::string& topic), (override));
};

} // namespace velocitas

#endif // MOCK_IPUBSUBCLIENT_H
Loading
Loading