diff --git a/.github/workflows/build_cmake.yml b/.github/workflows/build_cmake.yml index 46766ca6..17622062 100644 --- a/.github/workflows/build_cmake.yml +++ b/.github/workflows/build_cmake.yml @@ -34,12 +34,12 @@ jobs: BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 100 - name: Cache - uses: actions/cache@v3 + uses: actions/cache@v4 env: cache-name: cache-fetchContent-cache with: @@ -61,6 +61,7 @@ jobs: if: matrix.configurations.compiler == 'clang18' run: | wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key|sudo apt-key add - + sudo add-apt-repository 'deb http://apt.llvm.org/jammy/ llvm-toolchain-jammy-17 main' sudo apt update sudo apt upgrade -y sudo apt install -y clang-18 libc++-18-dev libc++abi-18-dev diff --git a/src/client/include/CmwLightClient.hpp b/src/client/include/CmwLightClient.hpp new file mode 100644 index 00000000..9b37777e --- /dev/null +++ b/src/client/include/CmwLightClient.hpp @@ -0,0 +1,897 @@ +#ifndef OPENCMW_CPP_CMWLIGHTCLIENT_HPP +#define OPENCMW_CPP_CMWLIGHTCLIENT_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace opencmw::client::cmwlight { + +struct CmwLightHeaderOptions { + int64_t b; // SOURCE_ID + std::map e; + // can potentially contain more and arbitrary data + // accessors to make code more readable + int64_t &sourceId() { return b; } + std::map sessionBody; +}; + +struct CmwLightHeader { + int8_t x_2; // REQ_TYPE_TAG + int64_t x_0; // ID_TAG + std::string x_1; // DEVICE_NAME + std::string f; // PROPERTY_NAME + int8_t x_7; // UPDATE_TYPE + std::string d; // SESSION_ID + std::unique_ptr x_3; + // accessors to make code more readable + int8_t &requestType() { return x_2; } + int64_t &id() { return x_0; } + std::string &device() { return x_1; } + std::string &property() { return f; } + int8_t &updateType() { return x_7; } + std::string &sessionId() { return d; } + std::unique_ptr &options() { return x_3; } +}; +struct CmwLightConnectBody { + std::string x_9; + std::string &clientInfo() { return x_9; } +}; +struct CmwLightRequestContext { + std::string x_8; // SELECTOR + std::map> c; // FILTERS + std::map> x; // DATA + // accessors to make code more readable + std::string &selector() { return x_8; }; + std::map> &filters() { return c; } + std::map> &data() { return x; } +}; +struct CmwLightDataContext { + std::string x_4; // CYCLE_NAME + int64_t x_6; // CYCLE_STAMP + int64_t x_5; // ACQ_STAMP + std::map x; // DATA // todo: support arbitrary filter data + // accessors to make code more readable + std::string &cycleName() { return x_4; } + long &cycleStamp() { return x_6; } + long &acqStamp() { return x_5; } + std::map &data() { return x; } +}; + +} // namespace opencmw::client::cmwlight +ENABLE_REFLECTION_FOR(opencmw::client::cmwlight::CmwLightHeaderOptions, b, e) +ENABLE_REFLECTION_FOR(opencmw::client::cmwlight::CmwLightHeader, x_2, x_0, x_1, f, x_7, d, x_3) +ENABLE_REFLECTION_FOR(opencmw::client::cmwlight::CmwLightConnectBody, x_9) +ENABLE_REFLECTION_FOR(opencmw::client::cmwlight::CmwLightRequestContext, x_8, c, x) +ENABLE_REFLECTION_FOR(opencmw::client::cmwlight::CmwLightDataContext, x_4, x_6, x_5, x) + +namespace opencmw::client::cmwlight { +namespace detail { +/** + * Sent as the first frame of an rda3 message determining the type of message + */ +enum class MessageType : char { SERVER_CONNECT_ACK = 0x01, + SERVER_REP = 0x02, + SERVER_HB = 0x03, + CLIENT_CONNECT = 0x20, + CLIENT_REQ = 0x21, + CLIENT_HB = 0x22 }; + +/** + * Frame Types in the descriptor (Last frame of a message containing the type of each sub message) + */ +enum class FrameType : char { HEADER = 0, + BODY = 1, + BODY_DATA_CONTEXT = 2, + BODY_REQUEST_CONTEXT = 3, + BODY_EXCEPTION = 4 }; + +/* + * Field names for the Request Header +static const std::map FieldNames = { + { "EVENT_TYPE_TAG", "eventType" }, + { "MESSAGE_TAG", "message" }, + { "ID_TAG", "0" }, + { "DEVICE_NAME_TAG", "1" }, + { "REQ_TYPE_TAG", "2" }, + { "OPTIONS_TAG", "3" }, + { "CYCLE_NAME_TAG", "4" }, + { "ACQ_STAMP_TAG", "5" }, + { "CYCLE_STAMP_TAG", "6" }, + { "UPDATE_TYPE_TAG", "7" }, + { "SELECTOR_TAG", "8" }, + { "CLIENT_INFO_TAG", "9" }, + { "NOTIFICATION_ID_TAG", "a" }, + { "SOURCE_ID_TAG", "b" }, + { "FILTERS_TAG", "c" }, + { "DATA_TAG", "x" }, + { "SESSION_ID_TAG", "d" }, + { "SESSION_BODY_TAG", "e" }, + { "PROPERTY_NAME_TAG", "f" } +}; +*/ + +/** + * request type used in request header REQ_TYPE_TAG + */ +enum class RequestType : char { + GET = 0, + SET = 1, + CONNECT = 2, + REPLY = 3, + EXCEPTION = 4, + SUBSCRIBE = 5, + UNSUBSCRIBE = 6, + NOTIFICATION_DATA = 7, + NOTIFICATION_EXC = 8, + SUBSCRIBE_EXCEPTION = 9, + EVENT = 10, + SESSION_CONFIRM = 11 +}; + +/** + * UpdateType + */ +enum class UpdateType : char { NORMAL = 0, + FIRST_UPDATE = 1, + IMMEDIATE_UPDATE = 2 }; + +std::string getIdentity() { + std::string hostname; + hostname.resize(255); + int result = gethostname(hostname.data(), hostname.capacity()); + if (!result) { + hostname = "SYSPC008"; + } else { + hostname.resize(strnlen(hostname.data(), hostname.size())); + hostname.shrink_to_fit(); + } + static int CONNECTION_ID_GENERATOR = 0; + static int channelIdGenerator = 0; // todo: make this per connection + return fmt::format("{}/{}/{}/{}", hostname, getpid(), ++CONNECTION_ID_GENERATOR, ++channelIdGenerator); // N.B. this scheme is parsed/enforced by CMW +} + +std::string createClientInfo() { + // todo insert correct data + // return fmt::format("9#Address:#string#16#tcp:%2F%2FSYSPC008:0#ApplicationId:#string#69#app=fesa%2Dexplorer%2Dapp;ver=19%2E0%2E0;uid=akrimm;host=SYSPC008;pid=191616;#UserName:#string#6#akrimm#ProcessName:#string#8#cmwlight#Language:#string#3#cpp#StartTime:#long#1720084272252#Name:#string#15#cmwlightexample#Pid:#int#191616#Version:#string#6#10%2E3%2E0"); + return "9#Address:#string#16#tcp:%2F%2FSYSPC008:0#ApplicationId:#string#69#app=fesa%2Dexplorer%2Dapp;ver=19%2E0%2E0;uid=akrimm;host=SYSPC008;pid=191616;#UserName:#string#6#akrimm#ProcessName:#string#17#fesa%2Dexplorer%2Dapp#Language:#string#4#Java#StartTime:#long#1720084272252#Name:#string#17#fesa%2Dexplorer%2Dapp#Pid:#int#191616#Version:#string#6#10%2E3%2E0"; +} + +std::string createClientId() { + return "RemoteHostInfoImpl[name=fesa-explorer-app; userName=akrimm; appId=[app=fesa-explorer-app;ver=19.0.0;uid=akrimm;host=SYSPC008;pid=191616;]; process=fesa-explorer-app; pid=191616; address=tcp://SYSPC008:0; startTime=2024-07-04 11:11:12; connectionTime=About ago; version=10.3.0; language=Java]1"; +} + +struct PendingRequest { + enum class RequestState { INITIALIZED, + WAITING, + FINISHED }; + std::string reqId{ "" }; + opencmw::IoBuffer data{}; + RequestType requestType{ RequestType::GET }; + RequestState state{ RequestState::INITIALIZED }; + std::string uri{ "" }; +}; + +struct OpenSubscription { + enum class SubscriptionState { INITIALIZED, + SUBSCRIBING, + SUBSCRIBED, + UNSUBSCRIBING, + UNSUBSCRIBED }; + std::chrono::milliseconds backOff = 20ms; + long updateId; + long reqId = 0L; + long replyId; + SubscriptionState state = SubscriptionState::SUBSCRIBING; + std::chrono::system_clock::time_point nextTry; + std::string uri; +}; + +struct Connection { + enum class ConnectionState { DISCONNECTED, + CONNECTING1, + CONNECTING2, + CONNECTED, + }; + std::string _authority; + zmq::Socket _socket; + ConnectionState _connectionState = ConnectionState::DISCONNECTED; + timePoint _nextReconnectAttemptTimeStamp = std::chrono::system_clock::now(); + timePoint _lastHeartbeatReceived = std::chrono::system_clock::now(); + timePoint _lastHeartBeatSent = std::chrono::system_clock::now(); + std::chrono::milliseconds _backoff = 20ms; // implements exponential back-off to get + std::vector _frames{}; // currently received frames, will be accumulated until the message is complete + std::map _subscriptions; // all subscriptions requested for (un) subscribe + int64_t _subscriptionIdGenerator; + int64_t _requestIdGenerator; + + std::map _pendingRequests; + + Connection(const zmq::Context &context, const std::string_view authority, const int zmq_dealer_type) + : _authority{ authority }, _socket{ context, zmq_dealer_type } { + zmq::initializeSocket(_socket).assertSuccess(); + } +}; + +static void send(const zmq::Socket &socket, int param, std::string_view errorMsg, auto &&data) { + opencmw::zmq::MessageFrame connectFrame{ FWD(data) }; + if (!connectFrame.send(socket, param).isValid()) { + throw std::runtime_error(errorMsg.data()); + } +} + +static std::string descriptorToString(auto... descriptor) { + std::string result{}; + result.reserve(sizeof...(descriptor)); + ((result.push_back(static_cast(descriptor))), ...); + return result; +} + +static IoBuffer serialiseCmwLight(auto &requestType) { + IoBuffer buffer{}; + opencmw::serialise(buffer, requestType); + buffer.reset(); + return buffer; +} + +void sendConnectRequest(Connection &con) { + using namespace std::string_view_literals; + detail::send(con._socket, ZMQ_SNDMORE, "error sending get frame"sv, "\x21"); // 0x20 => detail::MessageType::CLIENT_REQ + CmwLightHeader header; + header.requestType() = static_cast(detail::RequestType::CONNECT); + header.id() = con._requestIdGenerator++; + header.options() = std::make_unique(); + send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, serialiseCmwLight(header)); // send message header + CmwLightConnectBody connectBody; + connectBody.clientInfo() = createClientInfo(); + send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, serialiseCmwLight(connectBody)); // send message header + using enum detail::FrameType; + send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY)); +} +} // namespace detail + +class CMWLightClientBase { +public: + virtual ~CMWLightClientBase() = default; + virtual bool receive(mdp::Message &message) = 0; + virtual timePoint housekeeping(const timePoint &now) = 0; + virtual void get(const URI &, std::string_view) = 0; + virtual void set(const URI &, std::string_view, const std::span &) = 0; + virtual void subscribe(const URI &, std::string_view) = 0; + virtual void unsubscribe(const URI &, std::string_view) = 0; +}; + +class CMWLightClient : public CMWLightClientBase { + using timeUnit = std::chrono::milliseconds; + const timeUnit _clientTimeout; + const zmq::Context &_context; + const std::string _clientId; + const std::string _sourceName; + std::vector _connections; + std::vector &_pollItems; + constexpr static const auto HEARTBEAT_INTERVAL = 2000ms; + +public: + explicit CMWLightClient(const zmq::Context &context, + std::vector &pollItems, + const timeUnit timeout = 1s, + std::string clientId = "") + : _clientTimeout(timeout), _context(context), _clientId(std::move(clientId)), _sourceName(fmt::format("CMWLightClient(clientId: {})", _clientId)), _pollItems(pollItems) {} + + void connect(const URI &uri) { + auto con = detail::Connection(_context, uri.authority().value(), ZMQ_DEALER); + _connections.push_back(std::move(con)); + } + + void connect(detail::Connection &con) { + using enum detail::Connection::ConnectionState; + using namespace std::string_view_literals; + // todo: for now we expect rda3tcp://host:port, but this should allow be rda3://devicename which will be looked up on the cmw directory server + auto endpoint = fmt::format("tcp://{}", con._authority); + std::string id = detail::getIdentity(); + if (!zmq::invoke(zmq_setsockopt, con._socket, ZMQ_IDENTITY, id.data(), id.size()).isValid()) { // hostname/process/id/channel -- seems to be needed by CMW :-| + fmt::print(stderr, "failed set socket identity"); + } + if (opencmw::zmq::invoke(zmq_connect, con._socket, endpoint).isValid()) { + _pollItems.push_back({ .socket = con._socket.zmq_ptr, .fd = 0, .events = ZMQ_POLLIN, .revents = 0 }); + } + // send rda3 connect message + detail::send(con._socket, ZMQ_SNDMORE, "error sending connect frame"sv, "\x20"); // 0x20 => detail::MessageType::CLIENT_CONNECT + detail::send(con._socket, 0, "error sending connect frame"sv, "1.0.0"); + con._connectionState = CONNECTING1; + } + + detail::Connection &findConnection(const URI &uri) { + const auto con = std::ranges::find_if(_connections, [&uri](detail::Connection &c) { return c._authority == uri.authority().value(); }); + if (con == _connections.end()) { + auto newCon = detail::Connection(_context, uri.authority().value(), ZMQ_DEALER); + connect(newCon); + _connections.push_back(std::move(newCon)); + return _connections.back(); + } + return *con; + } + + void get(const URI &uri, std::string_view req_id) override { + using namespace std::string_view_literals; + using enum detail::FrameType; + auto &con = findConnection(uri); // send message header + std::string key(req_id); + detail::PendingRequest value{}; + value.reqId = req_id; + value.requestType = detail::RequestType::GET; + value.state = detail::PendingRequest::RequestState::INITIALIZED; + value.uri = uri.str(); + con._pendingRequests.insert({ fmt::format("{}", req_id), std::move(value) }); + } + + void set(const URI &uri, std::string_view req_id, const std::span &request) override { + using namespace std::string_view_literals; + using enum detail::FrameType; + auto &con = findConnection(uri); // send message header + detail::PendingRequest value{}; + value.reqId = req_id; + value.requestType = detail::RequestType::SET; + value.data = IoBuffer{ request.data(), request.size() }; + value.state = detail::PendingRequest::RequestState::INITIALIZED; + value.uri = uri.str(); + con._pendingRequests.insert({ fmt::format("{}", req_id), std::move(value) }); + } + + void subscribe(const URI &uri, std::string_view req_id) override { + using namespace std::string_view_literals; + using enum detail::FrameType; + auto &con = findConnection(uri); + detail::OpenSubscription sub{}; + sub.state = detail::OpenSubscription::SubscriptionState::INITIALIZED; + sub.uri = uri.str(); + std::string req_id_string{ req_id }; + char *req_id_end = req_id_string.data() + req_id_string.size(); + sub.reqId = strtol(req_id_string.data(), &req_id_end, 10); + con._subscriptions.insert({ fmt::format("{}", con._subscriptionIdGenerator++), std::move(sub) }); + } + + void unsubscribe(const URI &uri, std::string_view req_id) override { + using namespace std::string_view_literals; + auto &con = findConnection(uri); + con._subscriptions[std::string{ req_id }].state = detail::OpenSubscription::SubscriptionState::UNSUBSCRIBING; + CmwLightHeader header; + header.requestType() = static_cast(detail::RequestType::UNSUBSCRIBE); + std::string reqIdString{ req_id }; + char *end = reqIdString.data() + req_id.size(); + header.id() = std::strtol(req_id.data(), &end, 10); + // header.options() = {}; + // header.sessionId() = sessionId; + // header.deviceName() = device; + // header.propertyName() = property; + // header.updateType() = updateType; + detail::send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, detail::serialiseCmwLight(header)); // send message header + CmwLightRequestContext ctx; + // send requestContext + using enum detail::FrameType; + detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER)); + } + + bool disconnect(detail::Connection &con) { +#if not defined(__EMSCRIPTEN__) and (not defined(__clang__) or (__clang_major__ >= 16)) + const auto remove = std::ranges::remove_if(_pollItems, [&con](zmq_pollitem_t &pollItem) { return pollItem.socket == con._socket.zmq_ptr; }); + _pollItems.erase(remove.begin(), remove.end()); +#else + const auto remove = std::remove_if(_pollItems.begin(), _pollItems.end(), [&con](zmq_pollitem_t &pollItem) { return pollItem.socket == con._socket.zmq_ptr; }); + _pollItems.erase(remove, _pollItems.end()); +#endif + con._connectionState = detail::Connection::ConnectionState::DISCONNECTED; + return true; + } + + static bool handleServerReply(mdp::Message &output, detail::Connection &con, const auto currentTime) { + using namespace std::string_view_literals; + if (con._frames.size() < 2 || con._frames.back().data().size() != con._frames.size() - 2 || detail::FrameType(con._frames.back().data()[0]) != detail::FrameType::HEADER) { + throw std::runtime_error(fmt::format("received malformed response: wrong number of frames({}) or mismatch with frame descriptor({})", con._frames.size(), con._frames.back().size())); + } + // deserialise header frames[1] + IoBuffer data(con._frames[1].data().data(), con._frames[1].data().size()); + DeserialiserInfo info = checkHeaderInfo(data, DeserialiserInfo{}, ProtocolCheck::LENIENT); + CmwLightHeader header; + auto result = opencmw::deserialise(data, header); + + if (con._connectionState == detail::Connection::ConnectionState::CONNECTING2) { + if (header.requestType() == static_cast(detail::RequestType::REPLY)) { + fmt::print("connected successfully\n"); + con._connectionState = detail::Connection::ConnectionState::CONNECTED; + con._lastHeartbeatReceived = currentTime; + return true; + } else { + throw std::runtime_error("expected connection reply but got different message"); + } + } + + using enum detail::RequestType; + switch (detail::RequestType{ header.requestType() }) { + case REPLY: { + auto request = con._pendingRequests[fmt::format("{}", header.id())]; + // con._pendingRequests.erase(header.id()); + output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client + output.command = opencmw::mdp::Command::Final; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT) + char *end = &request.reqId.back(); + output.id = std::strtoul(request.reqId.data(), &end, 10); /// std::size_t + output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03') + output.serviceName = "/"; /// std::string < service endpoint name (normally the URI path only), or client source ID (for broker <-> worker messages) + output.clientRequestID; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss) + output.topic = URI{ "/" }; /// URI < URI containing at least and optionally parameters + output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based + output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found") + // output.rbac; ///IoBuffer < optional RBAC meta-info -- may contain token, role, signed message hash (implementation dependent) + // con._pendingRequests.erase("{}", header.id()); + return true; + } + case EXCEPTION: { + fmt::print("exception\n"); + auto request = con._pendingRequests[fmt::format("{}", header.id())]; + // con._pendingRequests.erase(header.id()); + output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client + output.command = opencmw::mdp::Command::Final; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT) + output.id = 0; /// std::size_t + output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03') + output.serviceName = "/"; /// std::string < service endpoint name (normally the URI path only), or client source ID (for broker <-> worker messages) + output.clientRequestID = IoBuffer{}; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss) + output.topic = URI{ "/" }; /// URI < URI containing at least and optionally parameters + output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based + output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found") + return true; + } + case SUBSCRIBE: { + fmt::print("subscription sucessful: request id: {}, update id: {}\n", header.id(), header.options()->sourceId()); + std::cout << header << std::endl; + ; + auto &sub = con._subscriptions[fmt::format("{}", header.id())]; + sub.replyId = header.options()->sourceId(); + sub.state = detail::OpenSubscription::SubscriptionState::SUBSCRIBED; + sub.backOff = 20ms; // reset back-off + return false; + } + case UNSUBSCRIBE: { + fmt::print("unsubscribe\n"); + // successfully removed subscription + auto subscriptionForUnsub = con._subscriptions[fmt::format("{}", header.id())]; + subscriptionForUnsub.state = detail::OpenSubscription::SubscriptionState::UNSUBSCRIBED; + // con._subscriptions.erase(subscriptionForUnsub.updateId); + return false; + } + case NOTIFICATION_DATA: { + fmt::print("notification_data\n"); + std::string replyId; + std::cout << header << std::endl; + ; + auto sub = std::find_if(con._subscriptions.begin(), con._subscriptions.end(), [&header](auto &pair) { return pair.second.replyId == header.id(); }); + if (sub == con._subscriptions.end()) { + fmt::print("received unexpected subscription for replyId: {}\n", header.id()); + return false; + } + auto subscriptionForNotification = con._subscriptions[replyId]; + // URI endpointForNotificationContext; + // try { + // endpointForNotificationContext = new ParsedEndpoint(subscriptionForNotification.endpoint, reply.dataContext.cycleName).toURI(); + // } catch (URISyntaxException | CmwLightProtocol.RdaLightException e) { + // return false; // Error generating reply context URI + // } + output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client + output.command = opencmw::mdp::Command::Notify; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT) + output.id = static_cast(sub->second.reqId); /// std::size_t + output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03') + output.serviceName = "/"; /// std::string < service endpoint name (normally the URI path only), or client source ID (for broker <-> worker messages) + output.clientRequestID = IoBuffer{}; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss) + output.topic = URI{ "/" }; /// URI < URI containing at least and optionally parameters + output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based + output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found") + return true; + } + case NOTIFICATION_EXC: { + fmt::print("notification exception\n"); + std::string replyId; + if (con._subscriptions.find(replyId) == con._subscriptions.end()) { + return false; + } + auto subscriptionForNotifyExc = con._subscriptions[replyId]; + output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client + output.command = opencmw::mdp::Command::Notify; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT) + output.id = 0; /// std::size_t + output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03') + output.serviceName = "/"; /// std::string < service endpoint name (normally the URI path only), or client source ID (for broker <-> worker messages) + output.clientRequestID = IoBuffer{}; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss) + output.topic = URI{ "/" }; /// URI < URI containing at least and optionally parameters + output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based + output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found") + return true; + } + case SUBSCRIBE_EXCEPTION: { + fmt::print("subscribe exception\n"); + auto subForSubExc = con._subscriptions[fmt::format("{}", header.id())]; + subForSubExc.state = detail::OpenSubscription::SubscriptionState::UNSUBSCRIBED; + subForSubExc.nextTry = currentTime + subForSubExc.backOff; + subForSubExc.backOff *= 2; + // exception during subscription, retrying + output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client + output.command = opencmw::mdp::Command::Notify; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT) + output.id = 0; /// std::size_t + output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03') + output.clientRequestID = IoBuffer{}; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss) + output.topic = URI{ "/" }; /// URI < URI containing at least and optionally parameters + output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based + output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based + output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found") + return true; + } + case SESSION_CONFIRM: { + fmt::print("received session confirm\n"); + return false; + } + // unsupported or non-actionable replies + case GET: + case SET: + case CONNECT: + case EVENT: + default: + fmt::print("unsupported message: {}\n", header.requestType()); + return false; + } + } + + static bool handleMessage(mdp::Message &output, detail::Connection &con) { + assert(!con._frames.empty() && "this function can only be ever called with at least one frame"); + const auto currentTime = std::chrono::system_clock::now(); + using enum detail::MessageType; + using enum detail::Connection::ConnectionState; + switch (detail::MessageType(con._frames[0].data().at(0))) { + case SERVER_CONNECT_ACK: + if (con._connectionState == CONNECTING1) { + if (con._frames.size() < 2 || con._frames[1].data().empty()) { + throw std::runtime_error("server connect does not contain required version info"); + } + // verifyVersion(con._frames[1].data()); // todo: implement checking rda3 protocol version + con._connectionState = CONNECTING2; // proceed to step 2 by sending the CLIENT_REQ, REQ_TYPE=CONNECT message + sendConnectRequest(con); + con._lastHeartbeatReceived = currentTime; + con._backoff = 20ms; // reset back-off time + } else { + throw std::runtime_error("received unsolicited SERVER_CONNECT_ACK"); + } + break; + case SERVER_HB: + if (con._connectionState != CONNECTED && con._connectionState != CONNECTING2) { + fmt::print("received a heart-beat message on an unconnected connection!\n"); + return false; + } + con._lastHeartbeatReceived = currentTime; + break; + case SERVER_REP: + con._lastHeartbeatReceived = currentTime; + return handleServerReply(output, con, currentTime); + case CLIENT_CONNECT: + case CLIENT_REQ: + case CLIENT_HB: + default: + throw std::runtime_error("Unexpected client message type received from server"); + } + return false; + } + + bool receive(mdp::Message &output) override { + for (auto &con : _connections) { + while (true) { + zmq::MessageFrame frame; + const auto byteCountResultId = frame.receive(con._socket, ZMQ_DONTWAIT); + if (!byteCountResultId.isValid() || byteCountResultId.value() < 1) { + fmt::print("."); + break; + } + fmt::print("+"); + con._frames.push_back(std::move(frame)); + int64_t more; + size_t moreSize = sizeof(more); + if (!zmq::invoke(zmq_getsockopt, con._socket, ZMQ_RCVMORE, &more, &moreSize)) { + throw std::runtime_error("error checking rcvmore"); + } else if (more != 0) { + continue; + } else { + fmt::print("\nhandleMessages({})", con._frames.size()); + bool received = handleMessage(output, con); + con._frames.clear(); + if (received) { + return true; + } + } + } + } + return false; + } + + // method to be called in regular time intervals to send and verify heartbeats + timePoint housekeeping(const timePoint &now) override { + using ConnectionState = detail::Connection::ConnectionState; + using RequestState = detail::PendingRequest::RequestState; + using namespace std::literals; + using enum detail::OpenSubscription::SubscriptionState; + using enum detail::FrameType; + // handle connection state + for (auto &con : _connections) { + switch (con._connectionState) { + case ConnectionState::DISCONNECTED: + if (con._nextReconnectAttemptTimeStamp <= now) { + connect(con); + } + break; + case ConnectionState::CONNECTING1: + case ConnectionState::CONNECTING2: + if (con._nextReconnectAttemptTimeStamp + _clientTimeout < now) { + // abort connection attempt and start a new one + } + break; + case ConnectionState::CONNECTED: + for (auto &[id, req] : con._pendingRequests) { + using enum detail::RequestType; + if (req.state == RequestState::INITIALIZED) { + if (req.requestType == GET && !req.reqId.empty()) { + detail::send(con._socket, ZMQ_SNDMORE, "error sending get frame"sv, "\x21"); // 0x21 => detail::MessageType::CLIENT_REQ + CmwLightHeader msg; + msg.requestType() = static_cast(detail::RequestType::GET); + char *reqIdEnd = req.reqId.data() + req.reqId.size(); + msg.id() = std::strtol(req.reqId.data(), &reqIdEnd, 10) + 1; // +1 to start with the identical requst id as the java impl + msg.sessionId() = detail::createClientId(); + URI uri{ req.uri }; + msg.device() = uri.path()->substr(1, uri.path()->find('/', 1) - 1); + msg.property() = uri.path()->substr(uri.path()->find('/', 1) + 1); + msg.options() = std::make_unique(); + // msg.reqContext() = ""; + msg.updateType() = static_cast(detail::UpdateType::NORMAL); + detail::send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, detail::serialiseCmwLight(msg)); // send message header + bool hasRequestCtx = true; + if (hasRequestCtx) { + CmwLightRequestContext ctx; + ctx.selector() = ""; // todo: set correct ctx values + // ctx.data = {}; + // ctx.filters = {"triggerName", "asdf"}; + IoBuffer buffer{}; + serialise(buffer, ctx); + detail::send(con._socket, ZMQ_SNDMORE, "failed to send context frame"sv, std::move(buffer)); // send requestContext + detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY_REQUEST_CONTEXT)); + } else { + detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER)); + } + req.state = RequestState::WAITING; + } else if (req.requestType == SET) { + detail::send(con._socket, ZMQ_SNDMORE, "error sending get frame"sv, "\x21"); // 0x20 => detail::MessageType::CLIENT_REQ + CmwLightHeader msg; + msg.requestType() = static_cast(detail::RequestType::GET); + char *reqIdEnd = req.reqId.data() + req.reqId.size(); + msg.id() = std::strtol(req.reqId.data(), &reqIdEnd, 10); + msg.sessionId() = detail::createClientId(); + URI uri{ req.uri }; + msg.device() = uri.path()->substr(1, uri.path()->find('/', 1) - 1); + msg.property() = uri.path()->substr(uri.path()->find('/', 1) + 1); + // msg.reqContext() = ""; + msg.updateType() = static_cast(detail::UpdateType::NORMAL); + detail::send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, detail::serialiseCmwLight(msg)); // send message header + bool hasRequestCtx = false; + if (hasRequestCtx) { + CmwLightRequestContext ctx; + ctx.selector() = "asdf"; // todo: set correct ctx values + // ctx.data = {}; + // ctx.filters = {"triggerName", "asdf"}; + IoBuffer buffer{}; + serialise(buffer, ctx); + detail::send(con._socket, ZMQ_SNDMORE, "failed to send context frame"sv, std::move(buffer)); // send requestContext + detail::send(con._socket, ZMQ_SNDMORE, "failed to send data frame"sv, std::move(req.data)); // send requestContext + detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY_REQUEST_CONTEXT, BODY)); + } else { + detail::send(con._socket, ZMQ_SNDMORE, "failed to send data frame"sv, std::move(req.data)); // send requestContext + detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY)); + } + req.state = RequestState::WAITING; + } + } + } + for (auto &[id, sub] : con._subscriptions) { + if (sub.state == INITIALIZED) { + detail::send(con._socket, ZMQ_SNDMORE, "error sending get frame"sv, "\x21"); // 0x20 => detail::MessageType::CLIENT_REQ + opencmw::URI uri{ sub.uri }; + CmwLightHeader header; + header.id() = sub.reqId; + header.device() = uri.path()->substr(1, uri.path()->find('/', 1) - 1); + header.property() = uri.path()->substr(uri.path()->find('/', 1) + 1); + header.requestType() = static_cast(detail::RequestType::SUBSCRIBE); + header.sessionId() = detail::createClientId(); + detail::send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, detail::serialiseCmwLight(header)); // send message header + CmwLightRequestContext ctx; + ctx.filters() = { { "acquisitionModeFilter", 0 }, { "channelNameFilter", "GS01QS1F:Current@10Hz" } }; // todo: correct filters from query + IoBuffer buffer{}; + serialise(buffer, ctx); + detail::send(con._socket, ZMQ_SNDMORE, "failed to send context frame"sv, std::move(buffer)); // send requestContext + detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY_REQUEST_CONTEXT)); + con._lastHeartBeatSent = now; + sub.state = SUBSCRIBING; + } else if (sub.state == UNSUBSCRIBING) { + } + } + if (con._lastHeartBeatSent < now - HEARTBEAT_INTERVAL) { + detail::send(con._socket, 0, "error sending connect frame"sv, "\x22"); // 0x22 => detail::MessageType::CLIENT_HB + con._lastHeartBeatSent = now; + } + if (con._lastHeartbeatReceived < now - HEARTBEAT_INTERVAL * 3) { + fmt::print("Missed 3 heartbeats -> connection seems to be broken"); // todo correct error handling + } + break; // do nothing + } + } + return now + _clientTimeout / 2; + } + +private: +}; + +/* + * Implementation of the Majordomo client protocol. Spawns a single thread which controls all client connections and sockets. + * A dispatcher thread reads the requests from the command ring buffer and dispatches them to the zeromq poll loop using an inproc socket pair. + * TODO: Q: merge with the mdp client? it basically uses the same loop and zeromq polling scheme. + */ +class CmwLightClientCtx : public ClientBase { + using timeUnit = std::chrono::milliseconds; + std::unordered_map, std::unique_ptr> _clients; + const zmq::Context &_zctx; + zmq::Socket _control_socket_send; + zmq::Socket _control_socket_recv; + std::jthread _poller; + std::vector _pollitems{}; + std::unordered_map _requests; + std::unordered_map _subscriptions; + timeUnit _timeout; + std::string _clientId; + std::size_t _request_id = 0; + +public: + explicit CmwLightClientCtx(const zmq::Context &zeromq_context, const timeUnit timeout = 1s, std::string clientId = "") // todo: also pass thread pool + : _zctx{ zeromq_context }, _control_socket_send(zeromq_context, ZMQ_PAIR), _control_socket_recv(zeromq_context, ZMQ_PAIR), _timeout(timeout), _clientId(std::move(clientId)) { + _poller = std::jthread([this](const std::stop_token &stoken) { this->poll(stoken); }); + zmq::invoke(zmq_bind, _control_socket_send, "inproc://mdclientControlSocket").assertSuccess(); + _pollitems.push_back({ .socket = _control_socket_recv.zmq_ptr, .fd = 0, .events = ZMQ_POLLIN, .revents = 0 }); + } + + std::vector protocols() override { + return { "rda3", "rda3tcp" }; // rda3 protocol, if transport is unspecified, tcp is used if authority contains a port + } + + std::unique_ptr &getClient(const URI &uri) { + auto baseUri = URI::factory(uri).setQuery({}).path("").fragment("").build(); + if (_clients.contains(baseUri)) { + return _clients.at(baseUri); + } + auto [it, ins] = _clients.emplace(baseUri, createClient(baseUri)); + if (!ins) { + throw std::logic_error("could not insert client into client list\n"); + } + return it->second; + } + + std::unique_ptr createClient(const URI &uri) { + return std::make_unique(_zctx, _pollitems, _timeout, _clientId); + } + + void stop() override { + _poller.request_stop(); + _poller.join(); + } + + void request(Command cmd) override { + std::size_t req_id = 0; + if (cmd.callback) { + if (cmd.command == mdp::Command::Get || cmd.command == mdp::Command::Set) { + req_id = _request_id++; + _requests.insert({ req_id, Request{ .uri = cmd.topic, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } }); + } else if (cmd.command == mdp::Command::Subscribe) { + req_id = _request_id++; + _subscriptions.insert({ mdp::Topic::fromMdpTopic(cmd.topic).toZmqTopic(), Subscription{ .uri = cmd.topic, .callback = std::move(cmd.callback), .timestamp_received = cmd.arrivalTime } }); + } else if (cmd.command == mdp::Command::Unsubscribe) { + _requests.erase(0); // todo: lookup correct subscription + } + } + sendCmd(cmd.topic, cmd.command, req_id, cmd.data); + } + +private: + void sendCmd(const URI &uri, mdp::Command commandType, std::size_t req_id, IoBuffer data = {}) const { + const bool isSet = commandType == mdp::Command::Set; + zmq::MessageFrame cmdType{ std::string{ static_cast(commandType) } }; + cmdType.send(_control_socket_send, ZMQ_SNDMORE).assertSuccess(); + zmq::MessageFrame reqId{ std::to_string(req_id) }; + reqId.send(_control_socket_send, ZMQ_SNDMORE).assertSuccess(); + zmq::MessageFrame endpoint{ std::string(uri.str()) }; + endpoint.send(_control_socket_send, isSet ? ZMQ_SNDMORE : 0).assertSuccess(); + if (isSet) { + zmq::MessageFrame dataframe{ std::move(data) }; + dataframe.send(_control_socket_send, 0).assertSuccess(); + } + } + + void handleRequests() { + zmq::MessageFrame cmd; + zmq::MessageFrame reqId; + zmq::MessageFrame endpoint; + while (cmd.receive(_control_socket_recv, ZMQ_DONTWAIT).isValid()) { + if (!reqId.receive(_control_socket_recv, ZMQ_DONTWAIT).isValid()) { + throw std::logic_error("invalid request received: failure receiving message"); + } + if (!endpoint.receive(_control_socket_recv, ZMQ_DONTWAIT).isValid()) { + throw std::logic_error("invalid request received: invalid message contents"); + } + URI uri{ std::string(endpoint.data()) }; + auto &client = getClient(uri); + if (cmd.data().size() != 1) { + throw std::logic_error("invalid request received: wrong number of frames"); + } else if (cmd.data()[0] == static_cast(mdp::Command::Get)) { + client->get(uri, reqId.data()); + } else if (cmd.data()[0] == static_cast(mdp::Command::Set)) { + zmq::MessageFrame data; + if (!data.receive(_control_socket_recv, ZMQ_DONTWAIT).isValid()) { + throw std::logic_error("missing set str"); + } + client->set(uri, reqId.data(), std::span(data.data().data(), data.data().size())); + } else if (cmd.data()[0] == static_cast(mdp::Command::Subscribe)) { + client->subscribe(uri, reqId.data()); + } else if (cmd.data()[0] == static_cast(mdp::Command::Unsubscribe)) { + client->unsubscribe(uri, reqId.data()); + } else { + throw std::logic_error("invalid request received"); // messages always consist of 2 frames + } + } + } + + void poll(const std::stop_token &stoken) { + auto nextHousekeeping = std::chrono::system_clock::now(); + zmq::invoke(zmq_connect, _control_socket_recv, "inproc://mdclientControlSocket").assertSuccess(); + while (!stoken.stop_requested() && zmq::invoke(zmq_poll, _pollitems.data(), static_cast(_pollitems.size()), 200)) { + if (auto now = std::chrono::system_clock::now(); nextHousekeeping < now) { + nextHousekeeping = housekeeping(now); + // expire old subscriptions/requests/connections + } + handleRequests(); + for (const auto &[uri, client] : _clients) { + mdp::Message receivedEvent; + while (client->receive(receivedEvent)) { + if (_subscriptions.contains(receivedEvent.serviceName)) { + _subscriptions.at(receivedEvent.serviceName).callback(receivedEvent); // callback + } + if (_requests.contains(receivedEvent.id)) { + _requests.at(receivedEvent.id).callback(receivedEvent); // callback + _requests.erase(receivedEvent.id); + } + // perform housekeeping duties if necessary + if (auto now = std::chrono::system_clock::now(); nextHousekeeping < now) { + nextHousekeeping = housekeeping(now); + } + } + } + } + } + + timePoint housekeeping(timePoint now) const { + timePoint next = now + _timeout; + for (const auto &[uri, client] : _clients) { + next = std::min(next, client->housekeeping(now)); + } + return next; + } + // manage commands: setup new clients if necessary and establish new subscriptions etc + // todo: remove unused (= no open subscriptions && last request was some time ago) clients after some unused time +}; +} // namespace opencmw::client::cmwlight +#endif // OPENCMW_CPP_CMWLIGHTCLIENT_HPP diff --git a/src/client/include/DirectoryLightClient.hpp b/src/client/include/DirectoryLightClient.hpp new file mode 100644 index 00000000..a425a103 --- /dev/null +++ b/src/client/include/DirectoryLightClient.hpp @@ -0,0 +1,172 @@ +#ifndef OPENCMW_CPP_DIRECTORYLIGHTCLIENT_HPP +#define OPENCMW_CPP_DIRECTORYLIGHTCLIENT_HPP + +#include +#include +#include +#include + +using namespace std::chrono_literals; + +auto parse = [](const std::string &reply) { + auto urlDecode = [](std::string str) { + std::string ret; + ret.reserve(str.length()); + char ch; + std::size_t len = str.length(); + for (std::size_t i = 0; i < len; i++) { + if (str[i] != '%') { + if (str[i] == '+') { + ret += ' '; + } else { + ret += str[i]; + } + } else if (i + 2 < len) { + auto toHex = [](char c) { + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'a' && c <= 'f') return c - 'a' + 10; + if (c >= 'A' && c <= 'F') return c - 'A' + 10; + throw std::runtime_error("Invalid hexadecimal number"); + }; + ch = static_cast('\x10' * toHex(str.at(i + 1)) + toHex(str.at(i + 2))); + ret += ch; + i = i + 2; + } + } + return ret; + }; + using std::operator""sv; + std::map>>> devices; + if (reply.starts_with("ERROR")) { + throw std::runtime_error("Nameserver returned an error"); + } + // each line: one device + // auto lines = reply | std::views::lazy_split("\n"sv); + std::ranges::split_view lines{ reply, "\n"sv }; + // auto tokens = lines | std::views::transform([](auto &l) {return std::views::split(" "sv);}); + auto split_lines = std::views::transform([](auto str) { return std::ranges::split_view{ str, " "sv }; }); + for (auto l : lines | split_lines) { + if (l.empty()) { + continue; + } + std::string devicename{ std::string_view{ l.front().data(), l.front().size() } }; + auto l2 = std::views::drop(l, 1); + if (l2.empty()) { + devices.insert({ devicename, {} }); + continue; + } + std::string classname{ std::string_view{ l2.front().data(), l2.front().size() } }; + if (classname.starts_with("*NOT_BOUND*") || classname.starts_with("*UNKNOWN*")) { + devices.insert({ devicename, {} }); + continue; + } + auto l3 = std::views::drop(l2, 1); + if (l3.empty()) { + devices.insert({ devicename, {} }); + continue; + } + std::map>> attributes{}; + for (auto attributeString : l3) { + auto tokens = std::views::split(attributeString, "#"sv); + if (tokens.empty()) { + continue; + } + std::string addresfieldcount = { tokens.front().data(), tokens.front().size() }; + auto seperatorPos = addresfieldcount.find("://"); + std::string proto = addresfieldcount.substr(0, seperatorPos + 3); + std::size_t i; + char *end = to_address(addresfieldcount.end()); + std::size_t fieldCount = std::strtoull(addresfieldcount.data() + seperatorPos + 3, &end, 10); + auto [map, _] = attributes.insert({ proto, {} }); + map->second.insert({ "Classname", classname }); + + auto range = std::views::drop(tokens, 1); + auto iterator = range.begin(); + std::size_t n = 0; + while (n < fieldCount) { + std::string_view fieldNameView{ &(*iterator).front(), (*iterator).size() }; + std::string fieldname{ fieldNameView.substr(0, fieldNameView.size() - 1) }; + iterator++; + std::string type{ std::string_view{ &(*iterator).front(), (*iterator).size() } }; + if (type == "string") { + iterator++; + std::string sizeString{ std::string_view{ &(*iterator).front(), (*iterator).size() } }; + auto parsed = std::to_address(sizeString.end()); + std::size_t size = std::strtoull(sizeString.data(), &parsed, 10); + iterator++; + std::string string{ std::string_view{ &(*iterator).front(), (*iterator).size() } }; + map->second.insert({ fieldname, urlDecode(string) }); + } else if (type == "int") { + iterator++; + std::string sizeString{ std::string_view{ &(*iterator).front(), (*iterator).size() } }; + int number = std::atoi(sizeString.data()); + map->second.insert({ fieldname, number }); + } else if (type == "long") { + iterator++; + std::string sizeString{ std::string_view{ &(*iterator).front(), (*iterator).size() } }; + auto parsed = std::to_address(sizeString.end()); + long number = std::strtol(sizeString.data(), &parsed, 10); + map->second.insert({ fieldname, number }); + } else { + FAIL(fmt::format("unknown type: {}, field: {}, tokens: {}", type, fieldname, tokens)); + } + iterator++; + n++; + } + } + devices.insert({ devicename, attributes }); + } + return devices; +}; + +std::string resolveDirectoryLight(std::vector devices, std::string_view nameserver, opencmw::zmq::Context &ctx, std::chrono::milliseconds timeout = 500ms) { + const opencmw::zmq::Socket socket{ ctx, ZMQ_STREAM }; + if (!opencmw::zmq::invoke(zmq_connect, socket, nameserver).isValid()) { + throw std::runtime_error("could not connect to nameserver."); + } + std::string id; + std::size_t data_len = 255; + id.resize(data_len); + if (!opencmw::zmq::invoke(zmq_getsockopt, socket, ZMQ_IDENTITY, id.data(), &data_len).isValid()) { + throw std::runtime_error("could not get socket identity"); + } + id.resize(data_len); + + const std::string query = fmt::format("get-device-info\n@client-info opencmw-cpp-directory-light-client\n@version 0.0.1\n{}\n\n", fmt::join(devices, "\n")); + + opencmw::zmq::MessageFrame identityFrame{ std::string{ id } }; + if (!identityFrame.send(socket, ZMQ_SNDMORE).isValid()) { + throw std::runtime_error("error sending socket id"); + } + opencmw::zmq::MessageFrame queryFrame{ std::string{ query } }; + if (!queryFrame.send(socket, 0).isValid()) { + throw std::runtime_error("error sending query frame"); + } + + auto start_time = std::chrono::system_clock::now(); + std::string result; + bool data_received = false; + while ((result.empty() || (data_received && !result.empty())) && std::chrono::system_clock::now() - start_time < timeout) { // wait for a maximum of 5 seconds + data_received = false; + opencmw::zmq::MessageFrame idFrame; + const auto byteCountResultId = idFrame.receive(socket, ZMQ_DONTWAIT); + if (!byteCountResultId.isValid() || byteCountResultId.value() < 1) { + continue; + } + if (idFrame.data() != id) { + throw std::runtime_error("connection identifier from socket does not match connection"); + } + opencmw::zmq::MessageFrame frame; + for (auto byteCountResult = frame.receive(socket, ZMQ_DONTWAIT); byteCountResult.value() < 0; byteCountResult = frame.receive(socket, ZMQ_DONTWAIT)) { + } + if (frame.size() > 0) { + result += frame.data(); + data_received = true; + } + } + if (!opencmw::zmq::invoke(zmq_disconnect, socket, nameserver).isValid()) { + throw std::runtime_error("could not disconnect"); + } + return result; +} +#endif // OPENCMW_CPP_DIRECTORYLIGHTCLIENT_HPP diff --git a/src/client/test/CMakeLists.txt b/src/client/test/CMakeLists.txt index c60aa813..02f7036e 100644 --- a/src/client/test/CMakeLists.txt +++ b/src/client/test/CMakeLists.txt @@ -65,6 +65,16 @@ if(NOT EMSCRIPTEN) # TEST_PREFIX to whatever you want, or use different for different binaries catch_discover_tests(client_tests # TEST_PREFIX "unittests." REPORTER xml OUTPUT_DIR . OUTPUT_PREFIX "unittests." OUTPUT_SUFFIX .xml) catch_discover_tests(clientPublisher_tests) + + add_executable(CmwLightTest catch_main.cpp CmwLightTest.cpp) + target_link_libraries( + CmwLightTest + PUBLIC opencmw_project_warnings + opencmw_project_options + Catch2::Catch2 + client) + target_include_directories(CmwLightTest PRIVATE ${CMAKE_SOURCE_DIR}) + catch_discover_tests(CmwLightTest) endif() add_executable(rest_client_only_tests RestClientOnly_tests.cpp) diff --git a/src/client/test/CmwLightTest.cpp b/src/client/test/CmwLightTest.cpp new file mode 100644 index 00000000..3b25c7a6 --- /dev/null +++ b/src/client/test/CmwLightTest.cpp @@ -0,0 +1,97 @@ +#include +#include +#include +#include +#include + +TEST_CASE("RDA3", "[Client]") { + std::string nameserverExample = R"""(GSCD025 DigitizerDU2.dal025 rda3://9#Address:#string#18#tcp:%2F%2Fdal025:16134#ApplicationId:#string#114#app=DigitizerDU2;uid=root;host=dal025;pid=16912;os=Linux%2D3%2E10%2E101%2Drt111%2Dscu03;osArch=64bit;appArch=64bit;lang=C%2B%2B;#Language:#string#3#C%2B%2B#Name:#string#19#DigitizerDU2%2Edal025#Pid:#int#16912#ProcessName:#string#12#DigitizerDU2#StartTime:#long#1699343695922#UserName:#string#4#root#Version:#string#5#3%2E1%2E0 +GSCD023 DigitizerDU2.fel0053 rda3://9#Address:#string#18#tcp:%2F%2Ffel0053:3717#ApplicationId:#string#115#app=DigitizerDU2;uid=root;host=fel0053;pid=31447;os=Linux%2D3%2E10%2E101%2Drt111%2Dscu03;osArch=64bit;appArch=64bit;lang=C%2B%2B;#Language:#string#3#C%2B%2B#Name:#string#20#DigitizerDU2%2Efel0053#Pid:#int#31447#ProcessName:#string#12#DigitizerDU2#StartTime:#long#1701529074225#UserName:#string#4#root#Version:#string#5#3%2E1%2E0 +FantasyDevice3000 *UNKNOWN* *UNKNOWN*)"""; + + SECTION("ParseNameserverReply") { + std::map>>> devices = parse(nameserverExample); + REQUIRE(!devices["GSCD023"].empty()); + REQUIRE(!devices["GSCD025"].empty()); + REQUIRE(devices["FantasyDevice3000"].empty()); + REQUIRE(std::get(devices["GSCD025"]["rda3://"]["Address"]) == "tcp://dal025:16134"); + } + + SECTION("Query rda3 directory server/nameserver") { + auto env_nameserver = std::getenv("CMW_NAMESERVER"); + if (env_nameserver == nullptr) { + fmt::print("skipping BasicCmwLight example test as it relies on the availability of network infrastructure."); + return; // skip test + } else { + std::string nameserver{env_nameserver}; + opencmw::zmq::Context ctx{}; + auto result = resolveDirectoryLight({ "GSCD025", "GSCD023", "FantasyDevice3000" }, nameserver, ctx, 100ms); + REQUIRE(!result.empty()); + REQUIRE(result == nameserverExample); + } + }; +} + +// small utility function that prints the content of a string in the classic hexedit way with address, hexadecimal and ascii representations +static std::string hexview(const std::string_view value, std::size_t bytesPerLine = 4) { + std::string result; + result.reserve(value.size() * 4); + std::string alpha; // temporarily store the ascii representation + alpha.reserve(8 * bytesPerLine); + std::size_t i = 0; + for (auto c : value) { + if (static_cast(i) % (bytesPerLine * 8) == 0) { + result.append(fmt::format("{0:#08x} - {0:04} | ", i)); // print address in hex and decimal + } + result.append(fmt::format("{:02x} ", c)); + alpha.append(fmt::format("{}", std::isprint(c) ? c : '.')); + if (static_cast(i + 1) % 8 == 0) { + result.append(" "); + alpha.append(" "); + } + if (static_cast(i + 1) % (bytesPerLine * 8) == 0) { + result.append(fmt::format(" {}\n", alpha)); + alpha.clear(); + } + i++; + } + result.append(fmt::format("{:{}} {}\n", "", 3 * (9 * bytesPerLine - alpha.size()), alpha)); + return result; +}; + +TEST_CASE("BasicCmwLight example", "[Client]") { + if (std::getenv("CMW_NAMESERVER") == nullptr) { + fmt::print("skipping BasicCmwLight example test as it relies on the availability of network infrastructure."); + return; // skip test + } + const std::string digitizerAddress{ "tcp://dal007:2620" }; + // filters2String = "acquisitionModeFilter=int:0&channelNameFilter=GS11MU2:Voltage_1@10Hz"; + // GS01QS1F:Current@1Hz + // subscribe("r1", new URI("rda3", null, '/' + DEVICE + '/' + PROPERTY, "ctx=" + SELECTOR + "&" + filtersString, null), null); + // DEVICE = "GSCD002"; + // PROPERTY = "AcquisitionDAQ"; + // SELECTOR = "FAIR.SELECTOR.ALL"; + using namespace opencmw; + const zmq::Context zctx{}; + std::vector> clients; + clients.emplace_back(std::make_unique(zctx, 20ms, "testMajordomoClient")); + opencmw::client::ClientContext clientContext{ std::move(clients) }; + // send some requests + auto endpoint = URI::factory(URI(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/Version").build(); + + std::atomic getReceived{ 0 }; + clientContext.get(endpoint, [&getReceived](const mdp::Message &message) { + fmt::print("get reply: {}", hexview(message.data.asString())); + getReceived++; + }); + + std::atomic subscriptionUpdatesReceived{ 0 }; + auto subscriptionEndpoint = URI::factory(URI(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/AcquisitionDAQ").addQueryParameter("ctx", "FAIR.SELECTOR.ALL").addQueryParameter("filter", "acquisitonModeFilter=0;channelNameFilter=GS01QS1F:Current@1Hz").build(); + clientContext.subscribe(subscriptionEndpoint, [&subscriptionUpdatesReceived](const mdp::Message &message) { + fmt::print("subscription update: {}", hexview(message.data.asString())); + subscriptionUpdatesReceived++; + }); + + std::this_thread::sleep_for(8000ms); // allow the request to reach the server + REQUIRE(getReceived == 1); +} diff --git a/src/core/include/SpinWait.hpp b/src/core/include/SpinWait.hpp index df0bd136..1ab2aa66 100644 --- a/src/core/include/SpinWait.hpp +++ b/src/core/include/SpinWait.hpp @@ -1,6 +1,7 @@ #ifndef SPIN_WAIT_HPP #define SPIN_WAIT_HPP +#include #include #include #include @@ -67,12 +68,12 @@ class SpinWait { void reset() noexcept { _count = 0; } template - requires std::is_nothrow_invocable_r_v + requires std::is_nothrow_invocable_r_v bool spinUntil(const T &condition) const { return spinUntil(condition, -1); } template - requires std::is_nothrow_invocable_r_v + requires std::is_nothrow_invocable_r_v bool spinUntil(const T &condition, std::int64_t millisecondsTimeout) const { if (millisecondsTimeout < -1) { @@ -111,8 +112,8 @@ class AtomicMutex { SPIN_WAIT _spin_wait; public: - AtomicMutex() = default; - AtomicMutex(const AtomicMutex &) = delete; + AtomicMutex() = default; + AtomicMutex(const AtomicMutex &) = delete; AtomicMutex &operator=(const AtomicMutex &) = delete; // diff --git a/src/serialiser/include/IoSerialiser.hpp b/src/serialiser/include/IoSerialiser.hpp index 411d1e6e..40ccdd8c 100644 --- a/src/serialiser/include/IoSerialiser.hpp +++ b/src/serialiser/include/IoSerialiser.hpp @@ -8,8 +8,8 @@ #include #pragma clang diagnostic push -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-c-arrays" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-c-arrays" namespace opencmw { @@ -141,10 +141,18 @@ struct IoSerialiser { } }; +template string> +constexpr const char *sanitizeFieldName() { + if constexpr (N > 2 && string.data[0] == 'x' && string.data[1] == '_') { + return string.c_str() + 2; + } + return string.c_str(); +} + template forceinline int32_t findMemberIndex(const std::string_view &fieldName) noexcept { static constexpr ConstExprMap().members.size> m{ refl::util::map_to_array>(refl::reflect().members, [](auto field, auto index) { - return std::pair(field.name.c_str(), index); + return std::pair(sanitizeFieldName(), index); }) }; return m.at(fieldName, -1); } @@ -194,7 +202,7 @@ constexpr void serialise(IoBuffer &buffer, ReflectableClass auto const &value, F using UnwrappedMemberType = std::remove_reference_t; if constexpr (isReflectableClass()) { // nested data-structure const auto subfields = getNumberOfNonNullSubfields(getAnnotatedMember(unwrapPointer(fieldValue))); - FieldDescription auto field = newFieldHeader(buffer, member.name.c_str(), parent.hierarchyDepth + 1, FWD(fieldValue), subfields); + FieldDescription auto field = newFieldHeader(buffer, sanitizeFieldName(), parent.hierarchyDepth + 1, FWD(fieldValue), subfields); const std::size_t posSizePositionStart = FieldHeaderWriter::template put(buffer, field, START_MARKER_INST); const std::size_t posStartDataStart = buffer.size(); serialise(buffer, getAnnotatedMember(unwrapPointer(fieldValue)), field); // do not inspect annotation itself @@ -202,7 +210,7 @@ constexpr void serialise(IoBuffer &buffer, ReflectableClass auto const &value, F updateSize(buffer, posSizePositionStart, posStartDataStart); return; } else { // field is a (possibly annotated) primitive type - FieldDescription auto field = newFieldHeader(buffer, member.name.c_str(), parent.hierarchyDepth + 1, fieldValue, 0); + FieldDescription auto field = newFieldHeader(buffer, sanitizeFieldName(), parent.hierarchyDepth + 1, fieldValue, 0); FieldHeaderWriter::template put(buffer, field, fieldValue); } } @@ -214,7 +222,7 @@ template constexpr void serialise(IoBuffer &buffer, ReflectableClass auto const &value) { putHeaderInfo(buffer); const auto subfields = detail::getNumberOfNonNullSubfields(value); - auto field = detail::newFieldHeader(buffer, refl::reflect(value).name.c_str(), 0, value, subfields); + auto field = detail::newFieldHeader(buffer, sanitizeFieldName>().name.size, refl::reflect>().name>(), 0, value, subfields); const std::size_t posSizePositionStart = FieldHeaderWriter::template put(buffer, field, START_MARKER_INST); const std::size_t posStartDataStart = buffer.size(); detail::serialise(buffer, value, field); @@ -376,7 +384,7 @@ constexpr void deserialise(IoBuffer &buffer, ReflectableClass auto &value, Deser if constexpr (isReflectableClass()) { buffer.set_position(field.headerStart); // reset buffer position for the nested deserialiser to read again field.hierarchyDepth++; - field.fieldName = member.name.c_str(); // N.B. needed since member.name is referring to compile-time const string + field.fieldName = sanitizeFieldName(); // N.B. needed since member.name is referring to compile-time const string deserialise(buffer, unwrapPointerCreateIfAbsent(member(value)), info, field); field.hierarchyDepth--; field.subfields = previousSubFields - 1; diff --git a/src/serialiser/include/IoSerialiserCmwLight.hpp b/src/serialiser/include/IoSerialiserCmwLight.hpp index aaf171b8..65ea6d39 100644 --- a/src/serialiser/include/IoSerialiserCmwLight.hpp +++ b/src/serialiser/include/IoSerialiserCmwLight.hpp @@ -5,8 +5,8 @@ #include #pragma clang diagnostic push -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-c-arrays" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-c-arrays" namespace opencmw { @@ -232,16 +232,18 @@ struct IoSerialiser> { } } }; + template<> struct IoSerialiser { - inline static constexpr uint8_t getDataTypeId() { return 0xFC; } + inline static constexpr uint8_t getDataTypeId() { return 0x08; } constexpr static void serialise(IoBuffer &buffer, FieldDescription auto const &field, const START_MARKER &/*value*/) noexcept { buffer.put(static_cast(field.subfields)); } - constexpr static void deserialise(IoBuffer & /*buffer*/, FieldDescription auto const & /*field*/, const START_MARKER &) { - // do not do anything, as the start marker is of size zero and only the type byte is important + constexpr static void deserialise(IoBuffer &buffer, FieldDescription auto &field, const START_MARKER &) { + field.subfields = static_cast(buffer.get()); + field.dataStartPosition = buffer.position(); } }; @@ -318,7 +320,7 @@ struct FieldHeaderWriter { if (field.hierarchyDepth != 0) { buffer.put(field.fieldName); // full field name with zero termination } - if constexpr (!is_same_v) { // do not put startMarker type id into buffer + if (!is_same_v || field.hierarchyDepth != 0) { // do not put startMarker type id into buffer buffer.put(IoSerialiser::getDataTypeId()); // data type ID } IoSerialiser::serialise(buffer, field, getAnnotatedMember(unwrapPointer(data))); @@ -334,21 +336,25 @@ struct FieldHeaderReader { field.dataEndPosition = std::numeric_limits::max(); field.modifier = ExternalModifier::UNKNOWN; if (field.subfields == 0) { + if (field.hierarchyDepth != 0 && field.intDataType == IoSerialiser::getDataTypeId() && buffer.get() != 0) { + throw ProtocolException("logic error, parent serialiser claims no data but header differs"); + } field.intDataType = IoSerialiser::getDataTypeId(); field.hierarchyDepth--; field.dataStartPosition = buffer.position(); return; } if (field.subfields == -1) { - if (field.hierarchyDepth != 0) { // do not read field description for root element - field.fieldName = buffer.get(); // full field name + if (field.hierarchyDepth != 0) { // do not read field description for root element + field.fieldName = buffer.get(); // full field name + field.intDataType = buffer.get(); // data type ID + } else { + field.intDataType = IoSerialiser::getDataTypeId(); } - field.intDataType = IoSerialiser::getDataTypeId(); - field.subfields = static_cast(buffer.get()); } else { field.fieldName = buffer.get(); // full field name field.intDataType = buffer.get(); // data type ID - field.subfields--; // decrease the number of remaining fields in the structure... todo: adapt strategy for nested fields (has to somewhere store subfields) + field.subfields--; // decrease the number of remaining fields in the structure... } field.dataStartPosition = buffer.position(); } @@ -364,6 +370,99 @@ inline DeserialiserInfo checkHeaderInfo(IoBuffer &buffer, Deserialiser return info; } +/** + * The serialiser for std::variant is a bit special, as it contains a runtime determined type, while in general the IoSerialiser assumes that the + * type can be deduced statically from the type via the getDataTypeId function. + * Therefore for now only serialisation is implemented and the even there we have to retroactively overwrite the field header's type ID from within the + * serialise function. + */ +template +struct IoSerialiser> { + inline static constexpr uint8_t getDataTypeId() { + return IoSerialiser::getDataTypeId(); // this is just a stand-in and will be overwritten with the actual type id in the serialise function + } + + constexpr static void serialise(IoBuffer &buffer, FieldDescription auto const &field, const std::variant &value) noexcept { + std::visit([&buffer, &field](T &val) { + using StrippedT = std::remove_cvref_t; + // overwrite field header with actual type + buffer.resize(buffer.size() - sizeof(uint8_t)); + buffer.put(IoSerialiser::getDataTypeId()); + // serialise the contained value + IoSerialiser::serialise(buffer, field, val); + }, + value); + } + + constexpr static void deserialise(IoBuffer & /*buffer*/, FieldDescription auto const & /*parent*/, std::variant & /*value*/) noexcept { + throw ProtocolException("Deserialisation of variant types not currently supported"); + } +}; + +template +struct IoSerialiser> { + inline static constexpr uint8_t getDataTypeId() { + return IoSerialiser::getDataTypeId(); + } + + constexpr static void serialise(IoBuffer &buffer, FieldDescription auto const &parentField, const std::map &value) noexcept { + buffer.put(static_cast(value.size())); + for (auto &[key, val] : value) { + if constexpr (isReflectableClass()) { // nested data-structure + const auto subfields = value.size(); + FieldDescription auto field = newFieldHeader(buffer, parentField.hierarchyDepth + 1, FWD(val), subfields); + const std::size_t posSizePositionStart = FieldHeaderWriter::template put(buffer, field, val); + const std::size_t posStartDataStart = buffer.size(); + return; + } else { // field is a (possibly annotated) primitive type + FieldDescription auto field = opencmw::detail::newFieldHeader(buffer, key.c_str(), parentField.hierarchyDepth + 1, val, 0); + FieldHeaderWriter::template put(buffer, field, val); + } + } + } + + constexpr static void deserialise(IoBuffer &buffer, FieldDescription auto const &parent, std::map &value) noexcept { + DeserialiserInfo info; + constexpr ProtocolCheck check = ProtocolCheck::IGNORE; + using protocol = CmwLight; + auto field = opencmw::detail::newFieldHeader(buffer, "", parent.hierarchyDepth, ValueType{}, parent.subfields); + while (buffer.position() < buffer.size()) { + auto previousSubFields = field.subfields; + FieldHeaderReader::template get(buffer, info, field); + buffer.set_position(field.dataStartPosition); // skip to data start + + if (field.intDataType == IoSerialiser::getDataTypeId()) { // reached end of sub-structure + try { + IoSerialiser::deserialise(buffer, field, END_MARKER_INST); + } catch (...) { + if (opencmw::detail::handleDeserialisationErrorAndSkipToNextField(buffer, field, info, "IoSerialiser<{}, END_MARKER>::deserialise(buffer, {}::{}, END_MARKER_INST): position {} vs. size {} -- exception: {}", + protocol::protocolName(), parent.fieldName, field.fieldName, buffer.position(), buffer.size(), what())) { + continue; + } + } + return; // step down to previous hierarchy depth + } + + const auto [fieldValue, _] = value.insert({ std::string{ field.fieldName }, ValueType{} }); + if constexpr (isReflectableClass()) { + field.intDataType = IoSerialiser::getDataTypeId(); + buffer.set_position(field.headerStart); // reset buffer position for the nested deserialiser to read again + field.hierarchyDepth++; + deserialise(buffer, fieldValue->second, info, field); + field.hierarchyDepth--; + field.subfields = previousSubFields - 1; + } else { + constexpr int requestedType = IoSerialiser::getDataTypeId(); + if (requestedType != field.intDataType) { // mismatching data-type + opencmw::detail::moveToFieldEndBufferPosition(buffer, field); + opencmw::detail::handleDeserialisationError(info, "mismatched field type for map field {} - requested type: {} (typeID: {}) got: {}", field.fieldName, typeName, requestedType, field.intDataType); + return; + } + IoSerialiser::deserialise(buffer, field, fieldValue->second); + } + } + } +}; } // namespace opencmw #pragma clang diagnostic pop diff --git a/src/serialiser/test/IoSerialiserCmwLight_tests.cpp b/src/serialiser/test/IoSerialiserCmwLight_tests.cpp index 3f34a5ce..1212ed79 100644 --- a/src/serialiser/test/IoSerialiserCmwLight_tests.cpp +++ b/src/serialiser/test/IoSerialiserCmwLight_tests.cpp @@ -1,9 +1,10 @@ #pragma clang diagnostic push -#pragma ide diagnostic ignored "LoopDoesntUseConditionVariableInspection" -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" +#pragma ide diagnostic ignored "LoopDoesntUseConditionVariableInspection" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" #include #include +#include #include #include @@ -31,12 +32,71 @@ struct SimpleTestData { opencmw::MultiArray d{ { 1, 2, 3, 4, 5, 6 }, { 2, 3 } }; std::unique_ptr e = nullptr; std::set f{ "one", "two", "three" }; - bool operator==(const ioserialiser_cmwlight_test::SimpleTestData &other) const { // deep comparison function - return a == other.a && ab == other.ab && abc == other.abc && b == other.b && c == other.c && cd == other.cd && d == other.d && ((!e && !other.e) || *e == *(other.e)) && f == other.f; + std::map g{ { "g1", 1 }, { "g2", 2 }, { "g3", 3 } }; + bool operator==(const ioserialiser_cmwlight_test::SimpleTestData &o) const { // deep comparison function + return a == o.a && ab == o.ab && abc == o.abc && b == o.b && c == o.c && cd == o.cd && d == o.d && ((!e && !o.e) || *e == *(o.e)) && f == o.f && g == o.g; + } +}; +struct SimpleTestDataMapNested { + int g1; + int g2; + int g3; + + bool operator==(const SimpleTestDataMapNested &o) const = default; + bool operator==(const std::map &o) const { + return g1 == o.at("g1") && g2 == o.at("g2") && g3 == o.at("g3"); + } +}; +struct SimpleTestDataMapAsNested { + int a = 1337; + float ab = 13.37f; + double abc = 42.23; + std::string b = "hello"; + std::array c{ 3, 2, 1 }; + std::vector cd{ 2.3, 3.4, 4.5, 5.6 }; + std::vector ce{ "hello", "world" }; + opencmw::MultiArray d{ { 1, 2, 3, 4, 5, 6 }, { 2, 3 } }; + std::unique_ptr e = nullptr; + std::set f{ "one", "two", "three" }; + SimpleTestDataMapNested g{ 1, 2, 3 }; + bool operator==(const ioserialiser_cmwlight_test::SimpleTestDataMapAsNested &o) const { // deep comparison function + return a == o.a && ab == o.ab && abc == o.abc && b == o.b && c == o.c && cd == o.cd && d == o.d && ((!e && !o.e) || *e == *(o.e)) && f == o.f && g == o.g; + } + bool operator==(const ioserialiser_cmwlight_test::SimpleTestData &o) const { // deep comparison function + return a == o.a && ab == o.ab && abc == o.abc && b == o.b && c == o.c && cd == o.cd && d == o.d && ((!e && !o.e) || *e == *(o.e)) && f == o.f && g == o.g; } }; } // namespace ioserialiser_cmwlight_test -ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::SimpleTestData, a, ab, abc, b, c, cd, ce, d, e, f) +ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::SimpleTestData, a, ab, abc, b, c, cd, ce, d, e, f, g) +ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::SimpleTestDataMapAsNested, a, ab, abc, b, c, cd, ce, d, e, f, g) +ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::SimpleTestDataMapNested, g1, g2, g3) + +// small utility function that prints the content of a string in the classic hexedit way with address, hexadecimal and ascii representations +static std::string hexview(const std::string_view value, std::size_t bytesPerLine = 4) { + std::string result; + result.reserve(value.size() * 4); + std::string alpha; // temporarily store the ascii representation + alpha.reserve(8 * bytesPerLine); + std::size_t i = 0; + for (auto c : value) { + if (i % (bytesPerLine * 8) == 0) { + result.append(fmt::format("{0:#08x} - {0:04} | ", i)); // print address in hex and decimal + } + result.append(fmt::format("{:02x} ", c)); + alpha.append(fmt::format("{}", std::isprint(c) ? c : '.')); + if ((i + 1) % 8 == 0) { + result.append(" "); + alpha.append(" "); + } + if ((i + 1) % (bytesPerLine * 8) == 0) { + result.append(fmt::format(" {}\n", alpha)); + alpha.clear(); + } + i++; + } + result.append(fmt::format("{:{}} {}\n", "", 3 * (9 * bytesPerLine - alpha.size()), alpha)); + return result; +}; TEST_CASE("IoClassSerialiserCmwLight simple test", "[IoClassSerialiser]") { using namespace opencmw; @@ -46,9 +106,34 @@ TEST_CASE("IoClassSerialiserCmwLight simple test", "[IoClassSerialiser]") { debug::Timer timer("IoClassSerialiser basic syntax", 30); IoBuffer buffer; + IoBuffer bufferMap; std::cout << fmt::format("buffer size (before): {} bytes\n", buffer.size()); - SimpleTestData data{ + SimpleTestDataMapAsNested data{ + .a = 30, + .ab = 1.2f, + .abc = 1.23, + .b = "abc", + .c = { 5, 4, 3 }, + .cd = { 2.1, 4.2 }, + .ce = { "hallo", "welt" }, + .d = { { 6, 5, 4, 3, 2, 1 }, { 3, 2 } }, + .e = std::make_unique(SimpleTestDataMapAsNested{ + .a = 40, + .ab = 2.2f, + .abc = 2.23, + .b = "abcdef", + .c = { 9, 8, 7 }, + .cd = { 3.1, 1.2 }, + .ce = { "ei", "gude" }, + .d = { { 6, 5, 4, 3, 2, 1 }, { 3, 2 } }, + .e = nullptr, + .g = { 6, 6, 6 } }), + .f = { "four", "five" }, + .g = { 4, 5, 6 } + }; + + SimpleTestData dataMap{ .a = 30, .ab = 1.2f, .abc = 1.23, @@ -66,31 +151,52 @@ TEST_CASE("IoClassSerialiserCmwLight simple test", "[IoClassSerialiser]") { .cd = { 3.1, 1.2 }, .ce = { "ei", "gude" }, .d = { { 6, 5, 4, 3, 2, 1 }, { 3, 2 } }, - .e = nullptr }), - .f = { "four", "five" } + .e = nullptr, + .g = { { "g1", 6 }, { "g2", 6 }, { "g3", 6 } } }), + .f = { "four", "five" }, + .g = { { "g1", 4 }, { "g2", 5 }, { "g3", 6 } } }; // check that empty buffer cannot be deserialised buffer.put(0L); - CHECK_THROWS_AS((opencmw::deserialise(buffer, data)), ProtocolException); + // CHECK_THROWS_AS((opencmw::deserialise(buffer, data)), ProtocolException); buffer.clear(); - SimpleTestData data2; + SimpleTestDataMapAsNested data2; REQUIRE(data != data2); + SimpleTestDataMapAsNested dataMap2; + REQUIRE(dataMap != dataMap2); std::cout << "object (short): " << ClassInfoShort << data << '\n'; std::cout << fmt::format("object (fmt): {}\n", data); std::cout << "object (long): " << ClassInfoVerbose << data << '\n'; opencmw::serialise(buffer, data); + opencmw::serialise(bufferMap, dataMap); std::cout << fmt::format("buffer size (after): {} bytes\n", buffer.size()); - buffer.put("a\0df"sv); // add some garbage after the serialised object to check if it is handled correctly + buffer.put("a\0df"sv); // add some garbage after the serialised object to check if it is handled correctly + bufferMap.put("a\0df"sv); // add some garbage after the serialised object to check if it is handled correctly buffer.reset(); - auto result = opencmw::deserialise(buffer, data2); - std::cout << "deserialised object (long): " << ClassInfoVerbose << data2 << '\n'; - std::cout << "deserialisation messages: " << result << std::endl; - REQUIRE(data == data2); + bufferMap.reset(); + + std::cout << "buffer contentsSubObject: \n" + << hexview(buffer.asString()) << "\n"; + std::cout << "buffer contentsMap: \n" + << hexview(bufferMap.asString()) << "\n"; + + REQUIRE(buffer.asString() == bufferMap.asString()); + + // TODO: fix this case + // auto result = opencmw::deserialise(buffer, data2); + // std::cout << "deserialised object (long): " << ClassInfoVerbose << data2 << '\n'; + // std::cout << "deserialisation messages: " << result << std::endl; + //// REQUIRE(data == data2); + + // auto result2 = opencmw::deserialise(bufferMap, dataMap2); + // std::cout << "deserialised object (long): " << ClassInfoVerbose << dataMap2 << '\n'; + // std::cout << "deserialisation messages: " << result2 << std::endl; + // REQUIRE(dataMap == dataMap2); } REQUIRE(opencmw::debug::dealloc == opencmw::debug::alloc); // a memory leak occurred debug::resetStats(); @@ -118,9 +224,10 @@ struct SimpleTestDataMoreFields { bool operator==(const SimpleTestDataMoreFields &) const = default; std::unique_ptr e = nullptr; std::set f{ "one", "two", "three" }; + std::map g{ { "g1", 1 }, { "g2", 2 }, { "g3", 3 } }; }; } // namespace ioserialiser_cmwlight_test -ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::SimpleTestDataMoreFields, a2, ab2, abc2, b2, c2, cd2, ce2, d2, e2, a, ab, abc, b, c, cd, ce, d, e, f) +ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::SimpleTestDataMoreFields, a2, ab2, abc2, b2, c2, cd2, ce2, d2, e2, a, ab, abc, b, c, cd, ce, d, e, f, g) #pragma clang diagnostic pop TEST_CASE("IoClassSerialiserCmwLight missing field", "[IoClassSerialiser]") { @@ -142,7 +249,8 @@ TEST_CASE("IoClassSerialiserCmwLight missing field", "[IoClassSerialiser]") { .cd = { 2.1, 4.2 }, .ce = { "hallo", "welt" }, .d = { { 6, 5, 4, 3, 2, 1 }, { 3, 2 } }, - .f = { "four", "six" } + .f = { "four", "six" }, + .g{ { "g1", 1 }, { "g2", 2 }, { "g3", 3 } } }; SimpleTestDataMoreFields data2; std::cout << fmt::format("object (fmt): {}\n", data); @@ -151,7 +259,7 @@ TEST_CASE("IoClassSerialiserCmwLight missing field", "[IoClassSerialiser]") { auto result = opencmw::deserialise(buffer, data2); std::cout << fmt::format("deserialised object (fmt): {}\n", data2); std::cout << "deserialisation messages: " << result << std::endl; - REQUIRE(result.setFields["root"] == std::vector{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1 }); + REQUIRE(result.setFields["root"] == std::vector{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1 }); REQUIRE(result.additionalFields.empty()); REQUIRE(result.exceptions.empty()); @@ -162,10 +270,220 @@ TEST_CASE("IoClassSerialiserCmwLight missing field", "[IoClassSerialiser]") { auto result_back = opencmw::deserialise(buffer, data); std::cout << fmt::format("deserialised object (fmt): {}\n", data); std::cout << "deserialisation messages: " << result_back << std::endl; - REQUIRE(result_back.setFields["root"] == std::vector{ 1, 1, 1, 1, 1, 1, 1, 1, 0, 1 }); + REQUIRE(result_back.setFields["root"] == std::vector{ 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1 }); REQUIRE(result_back.additionalFields.size() == 8); REQUIRE(result_back.exceptions.size() == 8); } REQUIRE(opencmw::debug::dealloc == opencmw::debug::alloc); // a memory leak occurred debug::resetStats(); } + +namespace ioserialiser_cmwlight_test { +struct IntegerMap { + int x_8 = 1336; // fieldname gets mapped to "8" + int foo = 42; + int bar = 45; +}; +} // namespace ioserialiser_cmwlight_test +ENABLE_REFLECTION_FOR(ioserialiser_cmwlight_test::IntegerMap, x_8, foo, bar) + +TEST_CASE("IoClassSerialiserCmwLight deserialise into map", "[IoClassSerialiser]") { + using namespace opencmw; + using namespace ioserialiser_cmwlight_test; + debug::resetStats(); + { + // serialise + IoBuffer buffer; + IntegerMap input{ 23, 13, 37 }; + opencmw::serialise(buffer, input); + buffer.reset(); + REQUIRE(buffer.size() == sizeof(int32_t) /* map size */ + refl::reflect(input).members.size /* map entries */ * (sizeof(int32_t) /* string lengths */ + sizeof(uint8_t) /* type */ + sizeof(int32_t) /* int */) + 2 + 4 + 4 /* strings + \0 */); + // std::cout << hexview(buffer.asString()); + + // deserialise + std::map deserialised{}; + DeserialiserInfo info; + auto field = opencmw::detail::newFieldHeader(buffer, "map", 0, deserialised, -1); + opencmw::FieldHeaderReader::template get(buffer, info, field); + IoSerialiser::deserialise(buffer, field, START_MARKER_INST); + opencmw::IoSerialiser>::deserialise(buffer, field, deserialised); + + // check for correctness + REQUIRE(deserialised.size() == 3); + REQUIRE(deserialised["8"] == 23); + REQUIRE(deserialised["foo"] == 13); + REQUIRE(deserialised["bar"] == 37); + } + REQUIRE(opencmw::debug::dealloc == opencmw::debug::alloc); // a memory leak occurred + debug::resetStats(); +} + +TEST_CASE("IoClassSerialiserCmwLight deserialise variant map", "[IoClassSerialiser]") { + using namespace opencmw; + using namespace ioserialiser_cmwlight_test; + debug::resetStats(); + { + const std::string_view expected{ "\x03\x00\x00\x00" // 3 fields + "\x02\x00\x00\x00" + "a\x00" + "\x03" + "\x23\x00\x00\x00" // "a" -> int:0x23 + "\x02\x00\x00\x00" + "b\x00" + "\x06" + "\xEC\x51\xB8\x1E\x85\xEB\xF5\x3F" // "b" -> double:1.337 + "\x02\x00\x00\x00" + "c\x00" + "\x07" + "\x04\x00\x00\x00" + "foo\x00" // "c" -> "foo" + , + 45 }; + std::map> map{ { "a", 0x23 }, { "b", 1.37 }, { "c", "foo" } }; + + IoBuffer buffer; + auto field = opencmw::detail::newFieldHeader(buffer, "map", 0, map, -1); + opencmw::IoSerialiser>>::serialise(buffer, field, map); + buffer.reset(); + + // fmt::print("expected:\n{}\ngot:\n{}\n", hexview(expected), hexview(buffer.asString())); + REQUIRE(buffer.asString() == expected); + } + REQUIRE(opencmw::debug::dealloc == opencmw::debug::alloc); // a memory leak occurred + debug::resetStats(); +} + +namespace opencmw::serialiser::cmwlighttests { +struct CmwLightHeaderOptions { + int64_t b; // SOURCE_ID + std::map e; + // can potentially contain more and arbitrary data + // accessors to make code more readable + int64_t &sourceId() { return b; } + std::map sessionBody; +}; +struct CmwLightHeader { + int8_t x_2; // REQ_TYPE_TAG + int64_t x_0; // ID_TAG + std::string x_1; // DEVICE_NAME + std::string f; // PROPERTY_NAME + int8_t x_7; // UPDATE_TYPE + std::string d; // SESSION_ID + std::unique_ptr x_3; + // accessors to make code more readable + int8_t &requestType() { return x_2; } + int64_t &id() { return x_0; } + std::string &device() { return x_1; } + std::string &property() { return f; } + int8_t &updateType() { return x_7; } + std::string &sessionId() { return d; } + std::unique_ptr &options() { return x_3; } +}; +struct DigitizerVersion { + std::string classVersion; + std::string deployUnitVersion; + std::string fesaVersion; + std::string gr_flowgraph_version; + std::string gr_digitizer_version; + std::string daqAPIVersion; +}; +} // namespace opencmw::serialiser::cmwlighttests +ENABLE_REFLECTION_FOR(opencmw::serialiser::cmwlighttests::CmwLightHeaderOptions, b, e) +ENABLE_REFLECTION_FOR(opencmw::serialiser::cmwlighttests::CmwLightHeader, x_2, x_0, x_1, f, x_7, d, x_3) +ENABLE_REFLECTION_FOR(opencmw::serialiser::cmwlighttests::DigitizerVersion, classVersion, deployUnitVersion, fesaVersion, gr_flowgraph_version, gr_digitizer_version, daqAPIVersion) + +TEST_CASE("IoClassSerialiserCmwLight Deserialise rda3 data", "[IoClassSerialiser]") { + // ensure that important rda3 messages can be properly deserialized + using namespace opencmw; + debug::resetStats(); + using namespace opencmw::serialiser::cmwlighttests; + { + std::string_view rda3ConnectReply = "\x07\x00\x00\x00\x02\x00\x00\x00\x30\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x31\x00\x07\x01\x00\x00\x00\x00\x02\x00\x00\x00\x32\x00\x01\x03\x02\x00\x00\x00\x33\x00\x08\x02\x00\x00\x00\x02\x00\x00\x00\x62\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x65\x00\x08\x00\x00\x00\x00\x02\x00\x00\x00\x37\x00\x01\x70\x02\x00\x00\x00\x64\x00\x07\x01\x00\x00\x00\x00\x02\x00\x00\x00\x66\x00\x07\x01\x00\x00\x00\x00"sv; + // 0 1 2 3 4 5 6 7 8 9 a b c d e f + // 000 "\x07\x00\x00\x00\x02\x00\x00\x00" "\x30\x00\x04\x00\x00\x00\x00\x00" + // 010 "\x00\x00\x00\x02\x00\x00\x00\x31" "\x00\x07\x01\x00\x00\x00\x00\x02" + // 020 "\x00\x00\x00\x32\x00\x01\x03\x02" "\x00\x00\x00\x33\x00\x08\x02\x00" + // 030 "\x00\x00\x02\x00\x00\x00\x62\x00" "\x04\x00\x00\x00\x00\x00\x00\x00" + // 040 "\x00\x02\x00\x00\x00\x65\x00\x08" "\x00\x00\x00\x00\x02\x00\x00\x00" + // 050 "\x37\x00\x01\x70\x02\x00\x00\x00" "\x64\x00\x07\x01\x00\x00\x00\x00" + // 060 "\x02\x00\x00\x00\x66\x00\x07\x01" "\x00\x00\x00\x00"sv + IoBuffer buffer{ rda3ConnectReply.data(), rda3ConnectReply.size() }; + CmwLightHeader deserialised; + auto result = opencmw::deserialise(buffer, deserialised); + + REQUIRE(deserialised.requestType() == 3); + REQUIRE(deserialised.id() == 0); + REQUIRE(deserialised.device().empty()); + REQUIRE(deserialised.property().empty()); + REQUIRE(deserialised.updateType() == 0x70); + REQUIRE(deserialised.sessionId().empty()); + REQUIRE(deserialised.options()->sourceId() == 0); + REQUIRE(deserialised.options()->sessionBody.empty()); + } + { + std::string_view data = "\x07\x00\x00\x00\x02\x00\x00\x00\x30\x00\x04\x09\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x31\x00\x07\x08\x00\x00\x00\x47\x53\x43\x44\x30\x30\x32\x00\x02\x00\x00\x00\x32\x00\x01\x0b\x02\x00\x00\x00\x33\x00\x08\x01\x00\x00\x00\x02\x00\x00\x00\x65\x00\x08\x00\x00\x00\x00\x02\x00\x00\x00\x37\x00\x01\x00\x02\x00\x00\x00\x64\x00\x07\x25\x01\x00\x00\x52\x65\x6d\x6f\x74\x65\x48\x6f\x73\x74\x49\x6e\x66\x6f\x49\x6d\x70\x6c\x5b\x6e\x61\x6d\x65\x3d\x66\x65\x73\x61\x2d\x65\x78\x70\x6c\x6f\x72\x65\x72\x2d\x61\x70\x70\x3b\x20\x75\x73\x65\x72\x4e\x61\x6d\x65\x3d\x61\x6b\x72\x69\x6d\x6d\x3b\x20\x61\x70\x70\x49\x64\x3d\x5b\x61\x70\x70\x3d\x66\x65\x73\x61\x2d\x65\x78\x70\x6c\x6f\x72\x65\x72\x2d\x61\x70\x70\x3b\x76\x65\x72\x3d\x31\x39\x2e\x30\x2e\x30\x3b\x75\x69\x64\x3d\x61\x6b\x72\x69\x6d\x6d\x3b\x68\x6f\x73\x74\x3d\x53\x59\x53\x50\x43\x30\x30\x38\x3b\x70\x69\x64\x3d\x31\x39\x31\x36\x31\x36\x3b\x5d\x3b\x20\x70\x72\x6f\x63\x65\x73\x73\x3d\x66\x65\x73\x61\x2d\x65\x78\x70\x6c\x6f\x72\x65\x72\x2d\x61\x70\x70\x3b\x20\x70\x69\x64\x3d\x31\x39\x31\x36\x31\x36\x3b\x20\x61\x64\x64\x72\x65\x73\x73\x3d\x74\x63\x70\x3a\x2f\x2f\x53\x59\x53\x50\x43\x30\x30\x38\x3a\x30\x3b\x20\x73\x74\x61\x72\x74\x54\x69\x6d\x65\x3d\x32\x30\x32\x34\x2d\x30\x37\x2d\x30\x34\x20\x31\x31\x3a\x31\x31\x3a\x31\x32\x3b\x20\x63\x6f\x6e\x6e\x65\x63\x74\x69\x6f\x6e\x54\x69\x6d\x65\x3d\x41\x62\x6f\x75\x74\x20\x61\x67\x6f\x3b\x20\x76\x65\x72\x73\x69\x6f\x6e\x3d\x31\x30\x2e\x33\x2e\x30\x3b\x20\x6c\x61\x6e\x67\x75\x61\x67\x65\x3d\x4a\x61\x76\x61\x5d\x31\x00\x02\x00\x00\x00\x66\x00\x07\x08\x00\x00\x00\x56\x65\x72\x73\x69\x6f\x6e\x00"sv; + // reply req type: session confirm + // \x07 \x00 \x00 \x00 .... + // \x02 \x00 \x00 \x00 \x30 \x00 \x04 \x09 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x02 ....0... ........ + // \x00 \x00 \x00 \x31 \x00 \x07 \x08 \x00 \x00 \x00 \x47 \x53 \x43 \x44 \x30 \x30 ...1.... ..GSCD00 + // \x32 \x00 \x02 \x00 \x00 \x00 \x32 \x00 \x01 \x0b \x02 \x00 \x00 \x00 \x33 \x00 2.....2. ......3. + // \x08 \x01 \x00 \x00 \x00 \x02 \x00 \x00 \x00 \x65 \x00 \x08 \x00 \x00 \x00 \x00 ........ .e...... + // \x02 \x00 \x00 \x00 \x37 \x00 \x01 \x00 \x02 \x00 \x00 \x00 \x64 \x00 \x07 \x25 ....7... ....d..% + // \x01 \x00 \x00 \x52 \x65 \x6d \x6f \x74 \x65 \x48 \x6f \x73 \x74 \x49 \x6e \x66 ...Remot eHostInf + // \x6f \x49 \x6d \x70 \x6c \x5b \x6e \x61 \x6d \x65 \x3d \x66 \x65 \x73 \x61 \x2d oImpl[na me=fesa- + // \x65 \x78 \x70 \x6c \x6f \x72 \x65 \x72 \x2d \x61 \x70 \x70 \x3b \x20 \x75 \x73 explorer -app; us + // \x65 \x72 \x4e \x61 \x6d \x65 \x3d \x61 \x6b \x72 \x69 \x6d \x6d \x3b \x20 \x61 erName=a krimm; a + // \x70 \x70 \x49 \x64 \x3d \x5b \x61 \x70 \x70 \x3d \x66 \x65 \x73 \x61 \x2d \x65 ppId=[ap p=fesa-e + // \x78 \x70 \x6c \x6f \x72 \x65 \x72 \x2d \x61 \x70 \x70 \x3b \x76 \x65 \x72 \x3d xplorer- app;ver= + // \x31 \x39 \x2e \x30 \x2e \x30 \x3b \x75 \x69 \x64 \x3d \x61 \x6b \x72 \x69 \x6d 19.0.0;u id=akrim + // \x6d \x3b \x68 \x6f \x73 \x74 \x3d \x53 \x59 \x53 \x50 \x43 \x30 \x30 \x38 \x3b m;host=S YSPC008; + // \x70 \x69 \x64 \x3d \x31 \x39 \x31 \x36 \x31 \x36 \x3b \x5d \x3b \x20 \x70 \x72 pid=1916 16;]; pr + // \x6f \x63 \x65 \x73 \x73 \x3d \x66 \x65 \x73 \x61 \x2d \x65 \x78 \x70 \x6c \x6f ocess=fe sa-explo + // \x72 \x65 \x72 \x2d \x61 \x70 \x70 \x3b \x20 \x70 \x69 \x64 \x3d \x31 \x39 \x31 rer-app; pid=191 + // \x36 \x31 \x36 \x3b \x20 \x61 \x64 \x64 \x72 \x65 \x73 \x73 \x3d \x74 \x63 \x70 616; add ress=tcp + // \x3a \x2f \x2f \x53 \x59 \x53 \x50 \x43 \x30 \x30 \x38 \x3a \x30 \x3b \x20 \x73 ://SYSPC 008:0; s + // \x74 \x61 \x72 \x74 \x54 \x69 \x6d \x65 \x3d \x32 \x30 \x32 \x34 \x2d \x30 \x37 tartTime =2024-07 + // \x2d \x30 \x34 \x20 \x31 \x31 \x3a \x31 \x31 \x3a \x31 \x32 \x3b \x20 \x63 \x6f -04 11:1 1:12; co + // \x6e \x6e \x65 \x63 \x74 \x69 \x6f \x6e \x54 \x69 \x6d \x65 \x3d \x41 \x62 \x6f nnection Time=Abo + // \x75 \x74 \x20 \x61 \x67 \x6f \x3b \x20 \x76 \x65 \x72 \x73 \x69 \x6f \x6e \x3d ut ago; version= + // \x31 \x30 \x2e \x33 \x2e \x30 \x3b \x20 \x6c \x61 \x6e \x67 \x75 \x61 \x67 \x65 10.3.0; language + // \x3d \x4a \x61 \x76 \x61 \x5d \x31 \x00 \x02 \x00 \x00 \x00 \x66 \x00 \x07 \x08 =Java]1. ....f... + // \x00 \x00 \x00 \x56 \x65 \x72 \x73 \x69 \x6f \x6e \x00 ...Versi on. + IoBuffer buffer{ data.data(), data.size() }; + CmwLightHeader deserialised; + auto result = opencmw::deserialise(buffer, deserialised); + } + { + std::string_view data = "\x06\x00\x00\x00\x02\x00\x00\x00\x30\x00\x04\x09\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x31\x00\x07\x01\x00\x00\x00\x00\x02\x00\x00\x00\x32\x00\x01\x03\x02\x00\x00\x00\x37\x00\x01\x00\x02\x00\x00\x00\x64\x00\x07\x01\x00\x00\x00\x00\x02\x00\x00\x00\x66\x00\x07\x01\x00\x00\x00\x00\x01\xc3\x06\x00\x00\x00\x0d\x00\x00\x00\x63\x6c\x61\x73\x73\x56\x65\x72\x73\x69\x6f\x6e\x00\x07\x06\x00\x00\x00\x36\x2e\x30\x2e\x30\x00\x0e\x00\x00\x00\x64\x61\x71\x41\x50\x49\x56\x65\x72\x73\x69\x6f\x6e\x00\x07\x04\x00\x00\x00\x32\x2e\x30\x00\x12\x00\x00\x00\x64\x65\x70\x6c\x6f\x79\x55\x6e\x69\x74\x56\x65\x72\x73\x69\x6f\x6e\x00\x07\x06\x00\x00\x00\x36\x2e\x30\x2e\x30\x00\x0c\x00\x00\x00\x66\x65\x73\x61\x56\x65\x72\x73\x69\x6f\x6e\x00\x07\x06\x00\x00\x00\x37\x2e\x33\x2e\x30\x00\x15\x00\x00\x00\x67\x72\x5f\x64\x69\x67\x69\x74\x69\x7a\x65\x72\x5f\x76\x65\x72\x73\x69\x6f\x6e\x00\x07\x08\x00\x00\x00\x35\x2e\x31\x2e\x34\x2e\x30\x00\x15\x00\x00\x00\x67\x72\x5f\x66\x6c\x6f\x77\x67\x72\x61\x70\x68\x5f\x76\x65\x72\x73\x69\x6f\x6e\x00\x07\x08\x00\x00\x00\x35\x2e\x30\x2e\x32\x2e\x30\x00\x01\x62\x03\x00\x00\x00\x02\x00\x00\x00\x35\x00\x04\x88\x39\xfe\x41\x88\xf7\xde\x17\x02\x00\x00\x00\x36\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x78\x00\x08\x03\x00\x00\x00\x09\x00\x00\x00\x61\x63\x71\x53\x74\x61\x6d\x70\x00\x04\x88\x39\xfe\x41\x88\xf7\xde\x17\x05\x00\x00\x00\x74\x79\x70\x65\x00\x03\x02\x00\x00\x00\x08\x00\x00\x00\x76\x65\x72\x73\x69\x6f\x6e\x00\x03\x01\x00\x00\x00\x00\x03"; + // Reply with Req Type = Reply, gets sent after get request + // \x06 \x00 \x00 \x00 \x02 \x00 \x00 \x00 \x30 \x00 \x04 ... .....0.. + // \x09 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x02 \x00 \x00 \x00 \x31 \x00 \x07 \x01 ........ ....1... + // \x00 \x00 \x00 \x00 \x02 \x00 \x00 \x00 \x32 \x00 \x01 \x03 \x02 \x00 \x00 \x00 ........ 2....... + // \x37 \x00 \x01 \x00 \x02 \x00 \x00 \x00 \x64 \x00 \x07 \x01 \x00 \x00 \x00 \x00 7....... d....... + // \x02 \x00 \x00 \x00 \x66 \x00 \x07 \x01 \x00 \x00 \x00 \x00 \x01 \xc3 \x06 \x00 ....f... ........ + // \x00 \x00 \x0d \x00 \x00 \x00 \x63 \x6c \x61 \x73 \x73 \x56 \x65 \x72 \x73 \x69 ......cl assVersi + // \x6f \x6e \x00 \x07 \x06 \x00 \x00 \x00 \x36 \x2e \x30 \x2e \x30 \x00 \x0e \x00 on...... 6.0.0... + // \x00 \x00 \x64 \x61 \x71 \x41 \x50 \x49 \x56 \x65 \x72 \x73 \x69 \x6f \x6e \x00 ..daqAPI Version. + // \x07 \x04 \x00 \x00 \x00 \x32 \x2e \x30 \x00 \x12 \x00 \x00 \x00 \x64 \x65 \x70 .....2.0 .....dep + // \x6c \x6f \x79 \x55 \x6e \x69 \x74 \x56 \x65 \x72 \x73 \x69 \x6f \x6e \x00 \x07 loyUnitV ersion.. + // \x06 \x00 \x00 \x00 \x36 \x2e \x30 \x2e \x30 \x00 \x0c \x00 \x00 \x00 \x66 \x65 ....6.0. 0.....fe + // \x73 \x61 \x56 \x65 \x72 \x73 \x69 \x6f \x6e \x00 \x07 \x06 \x00 \x00 \x00 \x37 saVersio n......7 + // \x2e \x33 \x2e \x30 \x00 \x15 \x00 \x00 \x00 \x67 \x72 \x5f \x64 \x69 \x67 \x69 .3.0.... .gr_digi + // \x74 \x69 \x7a \x65 \x72 \x5f \x76 \x65 \x72 \x73 \x69 \x6f \x6e \x00 \x07 \x08 tizer_ve rsion... + // \x00 \x00 \x00 \x35 \x2e \x31 \x2e \x34 \x2e \x30 \x00 \x15 \x00 \x00 \x00 \x67 ...5.1.4 .0.....g + // \x72 \x5f \x66 \x6c \x6f \x77 \x67 \x72 \x61 \x70 \x68 \x5f \x76 \x65 \x72 \x73 r_flowgr aph_vers + // \x69 \x6f \x6e \x00 \x07 \x08 \x00 \x00 \x00 \x35 \x2e \x30 \x2e \x32 \x2e \x30 ion..... .5.0.2.0 + // \x00 \x01 \x62 \x03 \x00 \x00 \x00 \x02 \x00 \x00 \x00 \x35 \x00 \x04 \x88 \x39 ..b..... ...5...9 + // \xfe \x41 \x88 \xf7 \xde \x17 \x02 \x00 \x00 \x00 \x36 \x00 \x04 \x00 \x00 \x00 .A...... ..6..... + // \x00 \x00 \x00 \x00 \x00 \x02 \x00 \x00 \x00 \x78 \x00 \x08 \x03 \x00 \x00 \x00 ........ .x...... + // \x09 \x00 \x00 \x00 \x61 \x63 \x71 \x53 \x74 \x61 \x6d \x70 \x00 \x04 \x88 \x39 ....acqS tamp...9 + // \xfe \x41 \x88 \xf7 \xde \x17 \x05 \x00 \x00 \x00 \x74 \x79 \x70 \x65 \x00 \x03 .A...... ..type.. + // \x02 \x00 \x00 \x00 \x08 \x00 \x00 \x00 \x76 \x65 \x72 \x73 \x69 \x6f \x6e \x00 ........ version. + // \x03 \x01 \x00 \x00 \x00 \x00 \x03 ....... + IoBuffer buffer{ data.data(), data.size() }; + DigitizerVersion deserialised; + auto result = opencmw::deserialise(buffer, deserialised); + } + REQUIRE(opencmw::debug::dealloc == opencmw::debug::alloc); // a memory leak occurred + debug::resetStats(); +}