Skip to content

533 agent device data item ids changing #534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 22, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 5)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 3)
set(AGENT_VERSION_BUILD 4)
set(AGENT_VERSION_RC "")

# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent
Expand Down
7 changes: 1 addition & 6 deletions src/mtconnect/device_model/agent_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,11 @@ namespace mtconnect {
GetOption<bool>(adapter->getOptions(), config::SuppressIPAddress).value_or(false);
auto id = adapter->getIdentity();

stringstream name;
name << adapter->getHost() << ':' << adapter->getPort();

ErrorList errors;
Properties attrs {{"id", id}};
if (!suppress)
{
stringstream name;
name << adapter->getHost() << ':' << adapter->getPort();
attrs["name"] = name.str();
attrs["name"] = adapter->getName();
}
else
{
Expand Down
47 changes: 37 additions & 10 deletions src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace mtconnect {
AddOptions(block, m_options,
{{configuration::UUID, string()},
{configuration::Manufacturer, string()},
{configuration::AdapterIdentity, string()},
{configuration::Station, string()},
{configuration::Url, string()},
{configuration::Topics, StringList()},
Expand All @@ -81,13 +82,13 @@ namespace mtconnect {
{configuration::RealTime, false},
{configuration::RelativeTime, false}});
loadTopics(block, m_options);

if (!HasOption(m_options, configuration::MqttHost) &&
HasOption(m_options, configuration::Host))
{
m_options[configuration::MqttHost] = m_options[configuration::Host];
}

if (!HasOption(m_options, configuration::MqttPort))
{
if (HasOption(m_options, configuration::Port))
Expand All @@ -99,29 +100,29 @@ namespace mtconnect {
m_options[configuration::MqttPort] = 1883;
}
}

m_handler = m_pipeline.makeHandler();
auto clientHandler = make_unique<ClientHandler>();

m_pipeline.m_handler = m_handler.get();

clientHandler->m_connecting = [this](shared_ptr<MqttClient> client) {
m_handler->m_connecting(client->getIdentity());
m_handler->m_connecting(m_identity);
};

clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
client->connectComplete();
m_handler->m_connected(client->getIdentity());
m_handler->m_connected(m_identity);
subscribeToTopics();
};

clientHandler->m_disconnected = [this](shared_ptr<MqttClient> client) {
m_handler->m_disconnected(client->getIdentity());
m_handler->m_disconnected(m_identity);
};

clientHandler->m_receive = [this](shared_ptr<MqttClient> client, const std::string &topic,
const std::string &payload) {
m_handler->m_processMessage(topic, payload, client->getIdentity());
m_handler->m_processMessage(topic, payload, m_identity);
};

if (IsOptionSet(m_options, configuration::MqttTls) &&
Expand All @@ -146,11 +147,37 @@ namespace mtconnect {
m_client = make_shared<mtconnect::mqtt_client::MqttTcpClient>(m_ioContext, m_options,
std::move(clientHandler));
}

m_identity = m_client->getIdentity();

m_name = m_client->getUrl();

if (auto ident = GetOption<string>(m_options, configuration::AdapterIdentity))
{
m_identity = *ident;
}
else
{
stringstream identity;

identity << m_name;
auto topics = GetOption<StringList>(m_options, configuration::Topics);
if (topics)
{
for (const auto &s : *topics)
identity << s;
}

boost::uuids::detail::sha1 sha1;
sha1.process_bytes(identity.str().c_str(), identity.str().length());
boost::uuids::detail::sha1::digest_type digest;
sha1.get_digest(digest);

identity.str("");
identity << std::hex << digest[0] << digest[1] << digest[2];
m_identity = string("_") + (identity.str()).substr(0, 10);

m_options[configuration::AdapterIdentity] = m_identity;
}

m_options[configuration::AdapterIdentity] = m_name;
m_pipeline.build(m_options);
}

Expand Down
139 changes: 138 additions & 1 deletion test_package/mqtt_adapter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
#include <string>
#include <vector>

#include "mtconnect/configuration/config_options.hpp"
#include "mtconnect/pipeline/pipeline_context.hpp"
#include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp"

using namespace std;
using namespace mtconnect;
using namespace mtconnect::source::adapter;
using namespace mtconnect::source::adapter::mqtt_adapter;
using namespace mtconnect::pipeline;
using namespace mtconnect::asset;

namespace asio = boost::asio;
using namespace std::literals;

// main
int main(int argc, char *argv[])
Expand All @@ -43,6 +48,32 @@ int main(int argc, char *argv[])
return RUN_ALL_TESTS();
}

class MockPipelineContract : public PipelineContract
{
public:
MockPipelineContract(int32_t schemaVersion)
: m_schemaVersion(schemaVersion)
{}
DevicePtr findDevice(const std::string &) override { return nullptr; }
DataItemPtr findDataItem(const std::string &device, const std::string &name) override
{
return nullptr;
}
void eachDataItem(EachDataItem fun) override {}
void deliverObservation(observation::ObservationPtr obs) override {}
void deliverAsset(AssetPtr) override {}
void deliverDevices(std::list<DevicePtr>) override {}
void deliverDevice(DevicePtr) override {}
int32_t getSchemaVersion() const override { return m_schemaVersion; }
void deliverAssetCommand(entity::EntityPtr) override {}
void deliverCommand(entity::EntityPtr) override {}
void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {}
void sourceFailed(const std::string &id) override {}
const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; }

int32_t m_schemaVersion;
};

class MqttAdapterTest : public testing::Test
{
protected:
Expand All @@ -51,4 +82,110 @@ class MqttAdapterTest : public testing::Test
void TearDown() override {}
};

TEST_F(MqttAdapterTest, should_find_data_item_from_topic) {}
TEST_F(MqttAdapterTest, should_create_a_unique_id_based_on_topic)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_change_if_topics_change)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());

options[configuration::Topics] = StringList {"pipline/#"s, "topic/"s};
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("_29e17b8870", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_change_if_port_changes)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());

options[configuration::Port] = 1882;
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1882/", adapter->getName());
ASSERT_EQ("_7042e8f45e", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_change_if_host_changes)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());

options[configuration::Host] = "localhost";
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://localhost:1883/", adapter->getName());
ASSERT_EQ("_4cd2e64d4e", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_be_able_to_set_adapter_identity)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::AdapterIdentity, "MyIdentity"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("MyIdentity", adapter->getIdentity());
}
Loading