Skip to content

Commit b672799

Browse files
authored
Merge pull request #534 from mtconnect/533_agent_device_data_item_ids_changing
2 parents 6fe66eb + c81550e commit b672799

File tree

5 files changed

+178
-19
lines changed

5 files changed

+178
-19
lines changed

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
set(AGENT_VERSION_MAJOR 2)
33
set(AGENT_VERSION_MINOR 5)
44
set(AGENT_VERSION_PATCH 0)
5-
set(AGENT_VERSION_BUILD 3)
5+
set(AGENT_VERSION_BUILD 4)
66
set(AGENT_VERSION_RC "")
77

88
# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent

src/mtconnect/device_model/agent_device.cpp

+1-6
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,11 @@ namespace mtconnect {
7777
GetOption<bool>(adapter->getOptions(), config::SuppressIPAddress).value_or(false);
7878
auto id = adapter->getIdentity();
7979

80-
stringstream name;
81-
name << adapter->getHost() << ':' << adapter->getPort();
82-
8380
ErrorList errors;
8481
Properties attrs {{"id", id}};
8582
if (!suppress)
8683
{
87-
stringstream name;
88-
name << adapter->getHost() << ':' << adapter->getPort();
89-
attrs["name"] = name.str();
84+
attrs["name"] = adapter->getName();
9085
}
9186
else
9287
{

src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp

+37-10
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ namespace mtconnect {
6262
AddOptions(block, m_options,
6363
{{configuration::UUID, string()},
6464
{configuration::Manufacturer, string()},
65+
{configuration::AdapterIdentity, string()},
6566
{configuration::Station, string()},
6667
{configuration::Url, string()},
6768
{configuration::Topics, StringList()},
@@ -81,13 +82,13 @@ namespace mtconnect {
8182
{configuration::RealTime, false},
8283
{configuration::RelativeTime, false}});
8384
loadTopics(block, m_options);
84-
85+
8586
if (!HasOption(m_options, configuration::MqttHost) &&
8687
HasOption(m_options, configuration::Host))
8788
{
8889
m_options[configuration::MqttHost] = m_options[configuration::Host];
8990
}
90-
91+
9192
if (!HasOption(m_options, configuration::MqttPort))
9293
{
9394
if (HasOption(m_options, configuration::Port))
@@ -99,29 +100,29 @@ namespace mtconnect {
99100
m_options[configuration::MqttPort] = 1883;
100101
}
101102
}
102-
103+
103104
m_handler = m_pipeline.makeHandler();
104105
auto clientHandler = make_unique<ClientHandler>();
105106

106107
m_pipeline.m_handler = m_handler.get();
107108

108109
clientHandler->m_connecting = [this](shared_ptr<MqttClient> client) {
109-
m_handler->m_connecting(client->getIdentity());
110+
m_handler->m_connecting(m_identity);
110111
};
111112

112113
clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
113114
client->connectComplete();
114-
m_handler->m_connected(client->getIdentity());
115+
m_handler->m_connected(m_identity);
115116
subscribeToTopics();
116117
};
117118

118119
clientHandler->m_disconnected = [this](shared_ptr<MqttClient> client) {
119-
m_handler->m_disconnected(client->getIdentity());
120+
m_handler->m_disconnected(m_identity);
120121
};
121122

122123
clientHandler->m_receive = [this](shared_ptr<MqttClient> client, const std::string &topic,
123124
const std::string &payload) {
124-
m_handler->m_processMessage(topic, payload, client->getIdentity());
125+
m_handler->m_processMessage(topic, payload, m_identity);
125126
};
126127

127128
if (IsOptionSet(m_options, configuration::MqttTls) &&
@@ -146,11 +147,37 @@ namespace mtconnect {
146147
m_client = make_shared<mtconnect::mqtt_client::MqttTcpClient>(m_ioContext, m_options,
147148
std::move(clientHandler));
148149
}
149-
150-
m_identity = m_client->getIdentity();
150+
151151
m_name = m_client->getUrl();
152+
153+
if (auto ident = GetOption<string>(m_options, configuration::AdapterIdentity))
154+
{
155+
m_identity = *ident;
156+
}
157+
else
158+
{
159+
stringstream identity;
160+
161+
identity << m_name;
162+
auto topics = GetOption<StringList>(m_options, configuration::Topics);
163+
if (topics)
164+
{
165+
for (const auto &s : *topics)
166+
identity << s;
167+
}
168+
169+
boost::uuids::detail::sha1 sha1;
170+
sha1.process_bytes(identity.str().c_str(), identity.str().length());
171+
boost::uuids::detail::sha1::digest_type digest;
172+
sha1.get_digest(digest);
173+
174+
identity.str("");
175+
identity << std::hex << digest[0] << digest[1] << digest[2];
176+
m_identity = string("_") + (identity.str()).substr(0, 10);
177+
178+
m_options[configuration::AdapterIdentity] = m_identity;
179+
}
152180

153-
m_options[configuration::AdapterIdentity] = m_name;
154181
m_pipeline.build(m_options);
155182
}
156183

test_package/agent_device_test.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ TEST_F(AgentDeviceTest, should_add_component_and_data_items_for_adapter)
200200

201201
ASSERT_XML_PATH_COUNT(doc, ADAPTERS_PATH "/*", 1);
202202
ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@id", ID_PREFIX);
203-
ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@name", "127.0.0.1:21788");
203+
ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@name", "shdr://127.0.0.1:21788");
204204

205205
ASSERT_XML_PATH_EQUAL(
206206
doc, ADAPTER_DATA_ITEMS_PATH "/m:DataItem[@id='" ID_PREFIX "_adapter_uri']@type",

test_package/mqtt_adapter_test.cpp

+138-1
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,19 @@
2727
#include <string>
2828
#include <vector>
2929

30+
#include "mtconnect/configuration/config_options.hpp"
3031
#include "mtconnect/pipeline/pipeline_context.hpp"
3132
#include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp"
3233

3334
using namespace std;
3435
using namespace mtconnect;
3536
using namespace mtconnect::source::adapter;
3637
using namespace mtconnect::source::adapter::mqtt_adapter;
38+
using namespace mtconnect::pipeline;
39+
using namespace mtconnect::asset;
40+
3741
namespace asio = boost::asio;
42+
using namespace std::literals;
3843

3944
// main
4045
int main(int argc, char *argv[])
@@ -43,6 +48,32 @@ int main(int argc, char *argv[])
4348
return RUN_ALL_TESTS();
4449
}
4550

51+
class MockPipelineContract : public PipelineContract
52+
{
53+
public:
54+
MockPipelineContract(int32_t schemaVersion)
55+
: m_schemaVersion(schemaVersion)
56+
{}
57+
DevicePtr findDevice(const std::string &) override { return nullptr; }
58+
DataItemPtr findDataItem(const std::string &device, const std::string &name) override
59+
{
60+
return nullptr;
61+
}
62+
void eachDataItem(EachDataItem fun) override {}
63+
void deliverObservation(observation::ObservationPtr obs) override {}
64+
void deliverAsset(AssetPtr) override {}
65+
void deliverDevices(std::list<DevicePtr>) override {}
66+
void deliverDevice(DevicePtr) override {}
67+
int32_t getSchemaVersion() const override { return m_schemaVersion; }
68+
void deliverAssetCommand(entity::EntityPtr) override {}
69+
void deliverCommand(entity::EntityPtr) override {}
70+
void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {}
71+
void sourceFailed(const std::string &id) override {}
72+
const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; }
73+
74+
int32_t m_schemaVersion;
75+
};
76+
4677
class MqttAdapterTest : public testing::Test
4778
{
4879
protected:
@@ -51,4 +82,110 @@ class MqttAdapterTest : public testing::Test
5182
void TearDown() override {}
5283
};
5384

54-
TEST_F(MqttAdapterTest, should_find_data_item_from_topic) {}
85+
TEST_F(MqttAdapterTest, should_create_a_unique_id_based_on_topic)
86+
{
87+
asio::io_context ioc;
88+
asio::io_context::strand strand(ioc);
89+
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
90+
{configuration::Host, "mybroker.com"s},
91+
{configuration::Port, 1883},
92+
{configuration::Protocol, "mqtt"s},
93+
{configuration::Topics, StringList {"pipeline/#"s}}};
94+
boost::property_tree::ptree tree;
95+
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
96+
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
97+
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
98+
99+
ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
100+
ASSERT_EQ("_89c11f795e", adapter->getIdentity());
101+
}
102+
103+
TEST_F(MqttAdapterTest, should_change_if_topics_change)
104+
{
105+
asio::io_context ioc;
106+
asio::io_context::strand strand(ioc);
107+
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
108+
{configuration::Host, "mybroker.com"s},
109+
{configuration::Port, 1883},
110+
{configuration::Protocol, "mqtt"s},
111+
{configuration::Topics, StringList {"pipeline/#"s}}};
112+
boost::property_tree::ptree tree;
113+
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
114+
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
115+
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
116+
117+
ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
118+
ASSERT_EQ("_89c11f795e", adapter->getIdentity());
119+
120+
options[configuration::Topics] = StringList {"pipline/#"s, "topic/"s};
121+
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
122+
123+
ASSERT_EQ("_29e17b8870", adapter->getIdentity());
124+
}
125+
126+
TEST_F(MqttAdapterTest, should_change_if_port_changes)
127+
{
128+
asio::io_context ioc;
129+
asio::io_context::strand strand(ioc);
130+
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
131+
{configuration::Host, "mybroker.com"s},
132+
{configuration::Port, 1883},
133+
{configuration::Protocol, "mqtt"s},
134+
{configuration::Topics, StringList {"pipeline/#"s}}};
135+
boost::property_tree::ptree tree;
136+
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
137+
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
138+
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
139+
140+
ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
141+
ASSERT_EQ("_89c11f795e", adapter->getIdentity());
142+
143+
options[configuration::Port] = 1882;
144+
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
145+
146+
ASSERT_EQ("mqtt://mybroker.com:1882/", adapter->getName());
147+
ASSERT_EQ("_7042e8f45e", adapter->getIdentity());
148+
}
149+
150+
TEST_F(MqttAdapterTest, should_change_if_host_changes)
151+
{
152+
asio::io_context ioc;
153+
asio::io_context::strand strand(ioc);
154+
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
155+
{configuration::Host, "mybroker.com"s},
156+
{configuration::Port, 1883},
157+
{configuration::Protocol, "mqtt"s},
158+
{configuration::Topics, StringList {"pipeline/#"s}}};
159+
boost::property_tree::ptree tree;
160+
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
161+
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
162+
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
163+
164+
ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
165+
ASSERT_EQ("_89c11f795e", adapter->getIdentity());
166+
167+
options[configuration::Host] = "localhost"s;
168+
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
169+
170+
ASSERT_EQ("mqtt://localhost:1883/", adapter->getName());
171+
ASSERT_EQ("_4cd2e64d4e", adapter->getIdentity());
172+
}
173+
174+
TEST_F(MqttAdapterTest, should_be_able_to_set_adapter_identity)
175+
{
176+
asio::io_context ioc;
177+
asio::io_context::strand strand(ioc);
178+
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
179+
{configuration::Host, "mybroker.com"s},
180+
{configuration::Port, 1883},
181+
{configuration::Protocol, "mqtt"s},
182+
{configuration::AdapterIdentity, "MyIdentity"s},
183+
{configuration::Topics, StringList {"pipeline/#"s}}};
184+
boost::property_tree::ptree tree;
185+
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
186+
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
187+
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);
188+
189+
ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
190+
ASSERT_EQ("MyIdentity", adapter->getIdentity());
191+
}

0 commit comments

Comments
 (0)