Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit 1e1e523

Browse files
authored
Merge pull request #222 from mtconnect/mqtt_topic_initialization
Mqtt topic initialization
2 parents 51b0aba + 0808a10 commit 1e1e523

File tree

9 files changed

+59
-24
lines changed

9 files changed

+59
-24
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ set(AGENT_VERSION_MAJOR 2)
33
set(AGENT_VERSION_MINOR 0)
44
set(AGENT_VERSION_PATCH 0)
55
set(AGENT_VERSION_BUILD 12)
6-
set(AGENT_VERSION_RC "_RC21")
6+
set(AGENT_VERSION_RC "_RC22")
77

88
# This minimum version is to support Visual Studio 2017 and C++ feature checking and FetchContent
99
cmake_minimum_required(VERSION 3.16 FATAL_ERROR)

conan/mqtt_cpp/conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
class MqttcppConan(ConanFile):
55
name = "mqtt_cpp"
6-
version = "11.0.0"
6+
version = "13.1.0"
77
license = "Boost Software License, Version 1.0"
88
author = "Takatoshi Kondo [email protected]"
99
url = "https://github.com/redboltz/mqtt_cpp"

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class CppAgentConan(ConanFile):
1717
"date/2.4.1@#178e4ada4fefd011aaa81ab2bca646db",
1818
"nlohmann_json/3.9.1@#a41bc0deaf7f40e7b97e548359ccf14d",
1919
"openssl/1.1.1k@#f40064b74987c778d5c4e0416d75f1f0",
20-
"mqtt_cpp/11.0.0"]
20+
"mqtt_cpp/13.1.0"]
2121

2222
build_policy = "missing"
2323
default_options = {

demo/agent/agent.cfg

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ Directories {
3030
}
3131
}
3232

33+
Sinks {
34+
MqttService {
35+
}
36+
}
37+
3338
DevicesStyle { Location = /styles/styles.xsl }
3439
StreamsStyle { Location = /styles/styles.xsl }
3540

src/configuration/config_options.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ namespace mtconnect {
6666
DECLARE_CONFIGURATION(MqttTls);
6767
DECLARE_CONFIGURATION(MqttPort);
6868
DECLARE_CONFIGURATION(MqttHost);
69+
DECLARE_CONFIGURATION(MqttConnectInterval);
6970

7071
// Adapter Configuration
7172
DECLARE_CONFIGURATION(AdapterIdentity);

src/mqtt/mqtt_client.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace mtconnect {
4040
{
4141
public:
4242
MqttClient(boost::asio::io_context &ioc, std::unique_ptr<ClientHandler> &&handler)
43-
: m_ioContext(ioc), m_handler(std::move(handler))
43+
: m_ioContext(ioc), m_handler(std::move(handler)), m_connectInterval(5000)
4444
{}
4545
virtual ~MqttClient() = default;
4646
const auto &getIdentity() const { return m_identity; }
@@ -51,12 +51,14 @@ namespace mtconnect {
5151
virtual bool publish(const std::string &topic, const std::string &payload) = 0;
5252
auto isConnected() { return m_connected; }
5353
auto isRunning() { return m_running; }
54+
void connectComplete() { m_connected = true; }
5455

5556
protected:
5657
boost::asio::io_context &m_ioContext;
5758
std::string m_url;
5859
std::string m_identity;
5960
std::unique_ptr<ClientHandler> m_handler;
61+
std::chrono::milliseconds m_connectInterval;
6062

6163
bool m_running {false};
6264
bool m_connected {false};

src/mqtt/mqtt_client_impl.hpp

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ namespace mtconnect {
7878
identity.str("");
7979
identity << std::hex << digest[0] << digest[1] << digest[2];
8080
m_identity = std::string("_") + (identity.str()).substr(0, 10);
81+
82+
auto ci = GetOption<Seconds>(options, configuration::MqttConnectInterval);
83+
if (ci)
84+
m_connectInterval = *ci;
8185
}
8286

8387
~MqttClientImpl() { stop(); }
@@ -101,14 +105,21 @@ namespace mtconnect {
101105
}
102106
else if (ec == mqtt::connect_return_code::accepted)
103107
{
104-
LOG(info) << "MQTT Connected";
105-
m_connected = true;
108+
LOG(info) << "MQTT ConnAck: MQTT Connected";
109+
106110
if (m_handler && m_handler->m_connected)
111+
{
107112
m_handler->m_connected(shared_from_this());
113+
}
114+
else
115+
{
116+
LOG(debug) << "No connect handler, setting connected";
117+
m_connected = true;
118+
}
108119
}
109120
else
110121
{
111-
LOG(info) << "MQTT connection failed: " << ec;
122+
LOG(info) << "MQTT ConnAck: MQTT connection failed: " << ec;
112123
reconnect();
113124
}
114125
return true;
@@ -236,7 +247,18 @@ namespace mtconnect {
236247
if (m_handler && m_handler->m_connecting)
237248
m_handler->m_connecting(shared_from_this());
238249

239-
derived().getClient()->async_connect();
250+
derived().getClient()->async_connect([this](mqtt::error_code ec) {
251+
if (ec)
252+
{
253+
LOG(warning) << "MqttClientImpl::connect: cannot connect: " << ec.message()
254+
<< ", will retry";
255+
reconnect();
256+
}
257+
else
258+
{
259+
LOG(info) << "MqttClientImpl::connect: connected";
260+
}
261+
});
240262
}
241263

242264
void receive(mqtt::buffer &topic, mqtt::buffer &contents)
@@ -258,23 +280,24 @@ namespace mtconnect {
258280
LOG(info) << "Start reconnect timer";
259281

260282
// Set an expiry time relative to now.
261-
m_reconnectTimer.expires_after(std::chrono::seconds(5));
262-
263-
m_reconnectTimer.async_wait([this](const boost::system::error_code &error) {
264-
if (error != boost::asio::error::operation_aborted)
265-
{
266-
LOG(info) << "MqttClientImpl::reconnect: reconnect now";
283+
m_reconnectTimer.expires_after(m_connectInterval);
267284

268-
// Connect
269-
derived().getClient()->async_connect([this](mqtt::error_code ec) {
270-
LOG(info) << "MqttClientImpl::reconnect async_connect callback: " << ec.message();
271-
if (ec && ec != boost::asio::error::operation_aborted)
285+
m_reconnectTimer.async_wait(boost::asio::bind_executor(
286+
derived().getClient()->get_executor(), [this](const boost::system::error_code &error) {
287+
if (error != boost::asio::error::operation_aborted)
272288
{
273-
reconnect();
289+
LOG(info) << "MqttClientImpl::reconnect: reconnect now";
290+
291+
// Connect
292+
derived().getClient()->async_connect([this](mqtt::error_code ec) {
293+
LOG(info) << "MqttClientImpl::reconnect async_connect callback: " << ec.message();
294+
if (ec && ec != boost::asio::error::operation_aborted)
295+
{
296+
reconnect();
297+
}
298+
});
274299
}
275-
});
276-
}
277-
});
300+
}));
278301
}
279302

280303
protected:

src/sink/mqtt_sink/mqtt_service.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,16 @@ namespace mtconnect {
6363
auto clientHandler = make_unique<ClientHandler>();
6464
clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
6565
// Publish latest devices, assets, and observations
66+
auto &circ = m_sinkContract->getCircularBuffer();
67+
std::lock_guard<buffer::CircularBuffer> lock(circ);
68+
client->connectComplete();
69+
6670
for (auto &dev : m_sinkContract->getDevices())
6771
{
6872
publish(dev);
6973
}
7074

71-
for (auto &obs : m_sinkContract->getCircularBuffer().getLatest().getObservations())
75+
for (auto &obs : circ.getLatest().getObservations())
7276
{
7377
observation::ObservationPtr p {obs.second};
7478
publish(p);

src/source/adapter/shdr/shdr_adapter.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ namespace mtconnect {
108108
for (auto &o : options)
109109
m_options.insert_or_assign(o.first, o.second);
110110
m_pipeline.build(m_options);
111-
if (m_pipeline.started())
111+
if (!m_pipeline.started())
112112
m_pipeline.start();
113113
}
114114

0 commit comments

Comments
 (0)