diff --git a/CMakeLists.txt b/CMakeLists.txt index f9915b37..2e85fc9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ set(AGENT_VERSION_MAJOR 2) set(AGENT_VERSION_MINOR 5) set(AGENT_VERSION_PATCH 0) -set(AGENT_VERSION_BUILD 3) +set(AGENT_VERSION_BUILD 4) set(AGENT_VERSION_RC "") # This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent diff --git a/src/mtconnect/device_model/agent_device.cpp b/src/mtconnect/device_model/agent_device.cpp index dc2e6c6a..7c6c3fb0 100644 --- a/src/mtconnect/device_model/agent_device.cpp +++ b/src/mtconnect/device_model/agent_device.cpp @@ -77,16 +77,11 @@ namespace mtconnect { GetOption(adapter->getOptions(), config::SuppressIPAddress).value_or(false); auto id = adapter->getIdentity(); - stringstream name; - name << adapter->getHost() << ':' << adapter->getPort(); - ErrorList errors; Properties attrs {{"id", id}}; if (!suppress) { - stringstream name; - name << adapter->getHost() << ':' << adapter->getPort(); - attrs["name"] = name.str(); + attrs["name"] = adapter->getName(); } else { diff --git a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp index 67fda074..379b3534 100644 --- a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp +++ b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp @@ -62,6 +62,7 @@ namespace mtconnect { AddOptions(block, m_options, {{configuration::UUID, string()}, {configuration::Manufacturer, string()}, + {configuration::AdapterIdentity, string()}, {configuration::Station, string()}, {configuration::Url, string()}, {configuration::Topics, StringList()}, @@ -81,13 +82,13 @@ namespace mtconnect { {configuration::RealTime, false}, {configuration::RelativeTime, false}}); loadTopics(block, m_options); - + if (!HasOption(m_options, configuration::MqttHost) && HasOption(m_options, configuration::Host)) { m_options[configuration::MqttHost] = m_options[configuration::Host]; } - + if (!HasOption(m_options, configuration::MqttPort)) { if (HasOption(m_options, configuration::Port)) @@ -99,29 +100,29 @@ namespace mtconnect { m_options[configuration::MqttPort] = 1883; } } - + m_handler = m_pipeline.makeHandler(); auto clientHandler = make_unique(); m_pipeline.m_handler = m_handler.get(); clientHandler->m_connecting = [this](shared_ptr client) { - m_handler->m_connecting(client->getIdentity()); + m_handler->m_connecting(m_identity); }; clientHandler->m_connected = [this](shared_ptr client) { client->connectComplete(); - m_handler->m_connected(client->getIdentity()); + m_handler->m_connected(m_identity); subscribeToTopics(); }; clientHandler->m_disconnected = [this](shared_ptr client) { - m_handler->m_disconnected(client->getIdentity()); + m_handler->m_disconnected(m_identity); }; clientHandler->m_receive = [this](shared_ptr client, const std::string &topic, const std::string &payload) { - m_handler->m_processMessage(topic, payload, client->getIdentity()); + m_handler->m_processMessage(topic, payload, m_identity); }; if (IsOptionSet(m_options, configuration::MqttTls) && @@ -146,11 +147,37 @@ namespace mtconnect { m_client = make_shared(m_ioContext, m_options, std::move(clientHandler)); } - - m_identity = m_client->getIdentity(); + m_name = m_client->getUrl(); + + if (auto ident = GetOption(m_options, configuration::AdapterIdentity)) + { + m_identity = *ident; + } + else + { + stringstream identity; + + identity << m_name; + auto topics = GetOption(m_options, configuration::Topics); + if (topics) + { + for (const auto &s : *topics) + identity << s; + } + + boost::uuids::detail::sha1 sha1; + sha1.process_bytes(identity.str().c_str(), identity.str().length()); + boost::uuids::detail::sha1::digest_type digest; + sha1.get_digest(digest); + + identity.str(""); + identity << std::hex << digest[0] << digest[1] << digest[2]; + m_identity = string("_") + (identity.str()).substr(0, 10); + + m_options[configuration::AdapterIdentity] = m_identity; + } - m_options[configuration::AdapterIdentity] = m_name; m_pipeline.build(m_options); } diff --git a/test_package/agent_device_test.cpp b/test_package/agent_device_test.cpp index da174ed8..7ff334c6 100644 --- a/test_package/agent_device_test.cpp +++ b/test_package/agent_device_test.cpp @@ -200,7 +200,7 @@ TEST_F(AgentDeviceTest, should_add_component_and_data_items_for_adapter) ASSERT_XML_PATH_COUNT(doc, ADAPTERS_PATH "/*", 1); ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@id", ID_PREFIX); - ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@name", "127.0.0.1:21788"); + ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@name", "shdr://127.0.0.1:21788"); ASSERT_XML_PATH_EQUAL( doc, ADAPTER_DATA_ITEMS_PATH "/m:DataItem[@id='" ID_PREFIX "_adapter_uri']@type", diff --git a/test_package/mqtt_adapter_test.cpp b/test_package/mqtt_adapter_test.cpp index 2e2c4683..b1a9a640 100644 --- a/test_package/mqtt_adapter_test.cpp +++ b/test_package/mqtt_adapter_test.cpp @@ -27,6 +27,7 @@ #include #include +#include "mtconnect/configuration/config_options.hpp" #include "mtconnect/pipeline/pipeline_context.hpp" #include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp" @@ -34,7 +35,11 @@ using namespace std; using namespace mtconnect; using namespace mtconnect::source::adapter; using namespace mtconnect::source::adapter::mqtt_adapter; +using namespace mtconnect::pipeline; +using namespace mtconnect::asset; + namespace asio = boost::asio; +using namespace std::literals; // main int main(int argc, char *argv[]) @@ -43,6 +48,32 @@ int main(int argc, char *argv[]) return RUN_ALL_TESTS(); } +class MockPipelineContract : public PipelineContract +{ +public: + MockPipelineContract(int32_t schemaVersion) + : m_schemaVersion(schemaVersion) + {} + DevicePtr findDevice(const std::string &) override { return nullptr; } + DataItemPtr findDataItem(const std::string &device, const std::string &name) override + { + return nullptr; + } + void eachDataItem(EachDataItem fun) override {} + void deliverObservation(observation::ObservationPtr obs) override {} + void deliverAsset(AssetPtr) override {} + void deliverDevices(std::list) override {} + void deliverDevice(DevicePtr) override {} + int32_t getSchemaVersion() const override { return m_schemaVersion; } + void deliverAssetCommand(entity::EntityPtr) override {} + void deliverCommand(entity::EntityPtr) override {} + void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} + void sourceFailed(const std::string &id) override {} + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } + + int32_t m_schemaVersion; +}; + class MqttAdapterTest : public testing::Test { protected: @@ -51,4 +82,110 @@ class MqttAdapterTest : public testing::Test void TearDown() override {} }; -TEST_F(MqttAdapterTest, should_find_data_item_from_topic) {} +TEST_F(MqttAdapterTest, should_create_a_unique_id_based_on_topic) +{ + asio::io_context ioc; + asio::io_context::strand strand(ioc); + ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; + boost::property_tree::ptree tree; + pipeline::PipelineContextPtr context = make_shared(); + context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); + auto adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); + ASSERT_EQ("_89c11f795e", adapter->getIdentity()); +} + +TEST_F(MqttAdapterTest, should_change_if_topics_change) +{ + asio::io_context ioc; + asio::io_context::strand strand(ioc); + ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; + boost::property_tree::ptree tree; + pipeline::PipelineContextPtr context = make_shared(); + context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); + auto adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); + ASSERT_EQ("_89c11f795e", adapter->getIdentity()); + + options[configuration::Topics] = StringList {"pipline/#"s, "topic/"s}; + adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("_29e17b8870", adapter->getIdentity()); +} + +TEST_F(MqttAdapterTest, should_change_if_port_changes) +{ + asio::io_context ioc; + asio::io_context::strand strand(ioc); + ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; + boost::property_tree::ptree tree; + pipeline::PipelineContextPtr context = make_shared(); + context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); + auto adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); + ASSERT_EQ("_89c11f795e", adapter->getIdentity()); + + options[configuration::Port] = 1882; + adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://mybroker.com:1882/", adapter->getName()); + ASSERT_EQ("_7042e8f45e", adapter->getIdentity()); +} + +TEST_F(MqttAdapterTest, should_change_if_host_changes) +{ + asio::io_context ioc; + asio::io_context::strand strand(ioc); + ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; + boost::property_tree::ptree tree; + pipeline::PipelineContextPtr context = make_shared(); + context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); + auto adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); + ASSERT_EQ("_89c11f795e", adapter->getIdentity()); + + options[configuration::Host] = "localhost"s; + adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://localhost:1883/", adapter->getName()); + ASSERT_EQ("_4cd2e64d4e", adapter->getIdentity()); +} + +TEST_F(MqttAdapterTest, should_be_able_to_set_adapter_identity) +{ + asio::io_context ioc; + asio::io_context::strand strand(ioc); + ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::AdapterIdentity, "MyIdentity"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; + boost::property_tree::ptree tree; + pipeline::PipelineContextPtr context = make_shared(); + context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); + auto adapter = make_unique(ioc, context, options, tree); + + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); + ASSERT_EQ("MyIdentity", adapter->getIdentity()); +}