From 918eff21bf47283643a3a38c2c47e2c0e7301d95 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Fri, 9 May 2025 16:34:54 +0100 Subject: [PATCH 1/8] Move graph topology changing messages to Scheduler For https://github.com/fair-acc/opendigitizer/issues/246. OD will need to be adapted to send messages to scheduler instead of graph. Adapted tests. Introduced qa_SchedulerMessages.cpp for topology changing messages. Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Graph.hpp | 123 +------- core/include/gnuradio-4.0/Scheduler.hpp | 114 ++++++- core/test/CMakeLists.txt | 3 + core/test/message_utils.hpp | 5 +- core/test/qa_GraphMessages.cpp | 314 +------------------ core/test/qa_HierBlock.cpp | 5 +- core/test/qa_SchedulerMessages.cpp | 386 ++++++++++++++++++++++++ 7 files changed, 525 insertions(+), 425 deletions(-) create mode 100644 core/test/qa_SchedulerMessages.cpp diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index e2ab70ff0..e9ce199b5 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -33,22 +33,8 @@ namespace gr { namespace graph::property { -inline static const char* kEmplaceBlock = "EmplaceBlock"; -inline static const char* kRemoveBlock = "RemoveBlock"; -inline static const char* kReplaceBlock = "ReplaceBlock"; -inline static const char* kInspectBlock = "InspectBlock"; - -inline static const char* kBlockEmplaced = "BlockEmplaced"; -inline static const char* kBlockRemoved = "BlockRemoved"; -inline static const char* kBlockReplaced = "BlockReplaced"; +inline static const char* kInspectBlock = "InspectBlock"; inline static const char* kBlockInspected = "BlockInspected"; - -inline static const char* kEmplaceEdge = "EmplaceEdge"; -inline static const char* kRemoveEdge = "RemoveEdge"; - -inline static const char* kEdgeEmplaced = "EdgeEmplaced"; -inline static const char* kEdgeRemoved = "EdgeRemoved"; - inline static const char* kGraphInspect = "GraphInspect"; inline static const char* kGraphInspected = "GraphInspected"; @@ -335,12 +321,8 @@ class Graph : public gr::Block { Graph(property_map settings = {}) : gr::Block(std::move(settings)) { _blocks.reserve(100); // TODO: remove - propertyCallbacks[graph::property::kEmplaceBlock] = std::mem_fn(&Graph::propertyCallbackEmplaceBlock); - propertyCallbacks[graph::property::kRemoveBlock] = std::mem_fn(&Graph::propertyCallbackRemoveBlock); + propertyCallbacks[graph::property::kInspectBlock] = std::mem_fn(&Graph::propertyCallbackInspectBlock); - propertyCallbacks[graph::property::kReplaceBlock] = std::mem_fn(&Graph::propertyCallbackReplaceBlock); - propertyCallbacks[graph::property::kEmplaceEdge] = std::mem_fn(&Graph::propertyCallbackEmplaceEdge); - propertyCallbacks[graph::property::kRemoveEdge] = std::mem_fn(&Graph::propertyCallbackRemoveEdge); propertyCallbacks[graph::property::kGraphInspect] = std::mem_fn(&Graph::propertyCallbackGraphInspect); propertyCallbacks[graph::property::kRegistryBlockTypes] = std::mem_fn(&Graph::propertyCallbackRegistryBlockTypes); } @@ -375,14 +357,10 @@ class Graph : public gr::Block { */ [[nodiscard]] const Sequence& progress() noexcept { return *_progress.get(); } - BlockModel& addBlock(std::unique_ptr block, bool doEmitMessage = true) { + BlockModel& addBlock(std::unique_ptr block) { auto& newBlock = _blocks.emplace_back(std::move(block)); newBlock->init(_progress, _ioThreadPool); // TODO: Should we connectChildMessagePorts for these blocks as well? - setTopologyChanged(); - if (doEmitMessage) { - this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(newBlock.get())); - } return *newBlock.get(); } @@ -393,18 +371,12 @@ class Graph : public gr::Block { auto& newBlock = _blocks.emplace_back(std::make_unique>(std::move(initialSettings))); auto* rawBlockRef = static_cast(newBlock->raw()); rawBlockRef->init(_progress, _ioThreadPool); - setTopologyChanged(); - this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(newBlock.get())); return *rawBlockRef; } [[maybe_unused]] auto& emplaceBlock(std::string_view type, property_map initialSettings) { if (auto block_load = _pluginLoader->instantiate(type, std::move(initialSettings)); block_load) { - setTopologyChanged(); - auto& newBlock = addBlock(std::move(block_load), false); // false == do not emit message - - this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(std::addressof(newBlock))); - + auto& newBlock = addBlock(std::move(block_load)); return newBlock; } throw gr::exception(std::format("Can not create block {}", type)); @@ -503,25 +475,6 @@ class Graph : public gr::Block { return result; } - std::optional propertyCallbackEmplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) { - assert(propertyName == graph::property::kEmplaceBlock); - using namespace std::string_literals; - const auto& data = message.data.value(); - const std::string& type = std::get(data.at("type"s)); - const property_map& properties = [&] { - if (auto it = data.find("properties"s); it != data.end()) { - return std::get(it->second); - } else { - return property_map{}; - } - }(); - - emplaceBlock(type, properties); - - // Message is sent as a reaction to emplaceBlock, no need for a separate one - return {}; - } - std::optional propertyCallbackInspectBlock([[maybe_unused]] std::string_view propertyName, Message message) { assert(propertyName == graph::property::kInspectBlock); using namespace std::string_literals; @@ -540,12 +493,8 @@ class Graph : public gr::Block { return {reply}; } - std::optional propertyCallbackRemoveBlock([[maybe_unused]] std::string_view propertyName, Message message) { - assert(propertyName == graph::property::kRemoveBlock); - using namespace std::string_literals; - const auto& data = message.data.value(); - const std::string& uniqueName = std::get(data.at("uniqueName"s)); - auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; }); + void removeBlockByName(std::string_view uniqueName) { + auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; }); if (it == _blocks.end()) { throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name)); @@ -554,26 +503,11 @@ class Graph : public gr::Block { std::erase_if(_edges, [&it](const Edge& edge) { // return std::addressof(edge.sourceBlock()) == it->get() || std::addressof(edge.destinationBlock()) == it->get(); }); - _blocks.erase(it); - message.endpoint = graph::property::kBlockRemoved; - return {message}; + _blocks.erase(it); } - std::optional propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) { - assert(propertyName == graph::property::kReplaceBlock); - using namespace std::string_literals; - const auto& data = message.data.value(); - const std::string& uniqueName = std::get(data.at("uniqueName"s)); - const std::string& type = std::get(data.at("type"s)); - const property_map& properties = [&] { - if (auto it = data.find("properties"s); it != data.end()) { - return std::get(it->second); - } else { - return property_map{}; - } - }(); - + gr::BlockModel* replaceBlock(const std::string& uniqueName, const std::string& type, const property_map& properties) { auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; }); if (it == _blocks.end()) { throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name)); @@ -585,7 +519,7 @@ class Graph : public gr::Block { throw gr::exception(std::format("Can not create block {}", type)); } - addBlock(std::move(newBlock), false); // false == do not emit message + addBlock(std::move(newBlock)); BlockModel* oldBlock = it->get(); for (auto& edge : _edges) { @@ -599,27 +533,11 @@ class Graph : public gr::Block { } _blocks.erase(it); - std::optional result = gr::Message{}; - result->endpoint = graph::property::kBlockReplaced; - result->data = serializeBlock(newBlockRaw); - - (*result->data)["replacedBlockUniqueName"s] = uniqueName; - - return result; + return newBlockRaw; } - std::optional propertyCallbackEmplaceEdge([[maybe_unused]] std::string_view propertyName, Message message) { - assert(propertyName == graph::property::kEmplaceEdge); - using namespace std::string_literals; - const auto& data = message.data.value(); - const std::string& sourceBlock = std::get(data.at("sourceBlock"s)); - const std::string& sourcePort = std::get(data.at("sourcePort"s)); - const std::string& destinationBlock = std::get(data.at("destinationBlock"s)); - const std::string& destinationPort = std::get(data.at("destinationPort"s)); - [[maybe_unused]] const std::size_t minBufferSize = std::get(data.at("minBufferSize"s)); - [[maybe_unused]] const std::int32_t weight = std::get(data.at("weight"s)); - const std::string edgeName = std::get(data.at("edgeName"s)); - + void emplaceEdge(std::string_view sourceBlock, std::string sourcePort, std::string_view destinationBlock, // + std::string destinationPort, [[maybe_unused]] const std::size_t minBufferSize, [[maybe_unused]] const std::int32_t weight, std::string_view edgeName) { auto sourceBlockIt = std::ranges::find_if(_blocks, [&sourceBlock](const auto& block) { return block->uniqueName() == sourceBlock; }); if (sourceBlockIt == _blocks.end()) { throw gr::exception(std::format("Block {} was not found in {}", sourceBlock, this->unique_name)); @@ -645,31 +563,20 @@ class Graph : public gr::Block { const bool isArithmeticLike = sourcePortRef.portInfo().isValueTypeArithmeticLike; const std::size_t sanitizedMinBufferSize = minBufferSize == undefined_size ? graph::defaultMinBufferSize(isArithmeticLike) : minBufferSize; - _edges.emplace_back(sourceBlockIt->get(), sourcePort, destinationBlockIt->get(), destinationPort, sanitizedMinBufferSize, weight, edgeName); - - message.endpoint = graph::property::kEdgeEmplaced; - return message; + _edges.emplace_back(sourceBlockIt->get(), sourcePort, destinationBlockIt->get(), destinationPort, sanitizedMinBufferSize, weight, std::string(edgeName)); } - std::optional propertyCallbackRemoveEdge([[maybe_unused]] std::string_view propertyName, Message message) { - assert(propertyName == graph::property::kRemoveEdge); - using namespace std::string_literals; - const auto& data = message.data.value(); - const std::string& sourceBlock = std::get(data.at("sourceBlock"s)); - const std::string& sourcePort = std::get(data.at("sourcePort"s)); - + void removeEdgeBySourcePort(std::string_view sourceBlock, std::string_view sourcePort) { auto sourceBlockIt = std::ranges::find_if(_blocks, [&sourceBlock](const auto& block) { return block->uniqueName() == sourceBlock; }); if (sourceBlockIt == _blocks.end()) { throw gr::exception(std::format("Block {} was not found in {}", sourceBlock, this->unique_name)); } - auto& sourcePortRef = (*sourceBlockIt)->dynamicOutputPort(sourcePort); + auto& sourcePortRef = (*sourceBlockIt)->dynamicOutputPort(std::string(sourcePort)); if (sourcePortRef.disconnect() == ConnectionResult::FAILED) { throw gr::exception(std::format("Block {} sourcePortRef could not be disconnected {}", sourceBlock, this->unique_name)); } - message.endpoint = graph::property::kEdgeRemoved; - return message; } std::optional propertyCallbackGraphInspect([[maybe_unused]] std::string_view propertyName, Message message) { diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index 68195990d..12befb6c1 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -6,7 +6,7 @@ #include #include #include -#include + #include #include @@ -22,6 +22,21 @@ namespace gr::scheduler { using gr::thread_pool::BasicThreadPool; using namespace gr::message; +namespace property { + +inline static const char* kEmplaceBlock = "EmplaceBlock"; +inline static const char* kRemoveBlock = "RemoveBlock"; +inline static const char* kReplaceBlock = "ReplaceBlock"; +inline static const char* kEmplaceEdge = "EmplaceEdge"; +inline static const char* kRemoveEdge = "RemoveEdge"; + +inline static const char* kBlockEmplaced = "BlockEmplaced"; +inline static const char* kBlockRemoved = "BlockRemoved"; +inline static const char* kBlockReplaced = "BlockReplaced"; +inline static const char* kEdgeEmplaced = "EdgeEmplaced"; +inline static const char* kEdgeRemoved = "EdgeRemoved"; +} // namespace property + enum class ExecutionPolicy { singleThreaded, /// multiThreaded, /// @@ -86,7 +101,13 @@ class SchedulerBase : public Block { explicit SchedulerBase(gr::Graph&& graph, // std::shared_ptr thread_pool = std::make_shared("simple-scheduler-pool", thread_pool::CPU_BOUND), // const profiling::Options& profiling_options = {}) // - : _graph(std::move(graph)), _profiler{profiling_options}, _profilerHandler{_profiler.forThisThread()}, _pool(std::move(thread_pool)) {} + : _graph(std::move(graph)), _profiler{profiling_options}, _profilerHandler{_profiler.forThisThread()}, _pool(std::move(thread_pool)) { + this->propertyCallbacks[scheduler::property::kEmplaceBlock] = std::mem_fn(&SchedulerBase::propertyCallbackEmplaceBlock); + this->propertyCallbacks[scheduler::property::kRemoveBlock] = std::mem_fn(&SchedulerBase::propertyCallbackRemoveBlock); + this->propertyCallbacks[scheduler::property::kRemoveEdge] = std::mem_fn(&SchedulerBase::propertyCallbackRemoveEdge); + this->propertyCallbacks[scheduler::property::kEmplaceEdge] = std::mem_fn(&SchedulerBase::propertyCallbackEmplaceEdge); + this->propertyCallbacks[scheduler::property::kReplaceBlock] = std::mem_fn(&SchedulerBase::propertyCallbackReplaceBlock); + } ~SchedulerBase() { if (this->state() == lifecycle::RUNNING) { @@ -416,6 +437,95 @@ class SchedulerBase : public Block { } forAllUnmanagedBlocks([this](auto& block) { this->emitErrorMessageIfAny("resume() -> LifecycleState", block->changeStateTo(lifecycle::RUNNING)); }); } + + std::optional propertyCallbackEmplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) { + assert(propertyName == scheduler::property::kEmplaceBlock); + using namespace std::string_literals; + const auto& data = message.data.value(); + const std::string& type = std::get(data.at("type"s)); + const property_map& properties = [&] { + if (auto it = data.find("properties"s); it != data.end()) { + return std::get(it->second); + } else { + return property_map{}; + } + }(); + + auto& newBlock = _graph.emplaceBlock(type, properties); + + this->emitMessage(scheduler::property::kBlockEmplaced, Graph::serializeBlock(std::addressof(newBlock))); + + // Message is sent as a reaction to emplaceBlock, no need for a separate one + return {}; + } + + std::optional propertyCallbackRemoveBlock([[maybe_unused]] std::string_view propertyName, Message message) { + assert(propertyName == scheduler::property::kRemoveBlock); + using namespace std::string_literals; + const auto& data = message.data.value(); + const std::string& uniqueName = std::get(data.at("uniqueName"s)); + + _graph.removeBlockByName(uniqueName); + + message.endpoint = scheduler::property::kBlockRemoved; + return {message}; + } + + std::optional propertyCallbackRemoveEdge([[maybe_unused]] std::string_view propertyName, Message message) { + assert(propertyName == scheduler::property::kRemoveEdge); + using namespace std::string_literals; + const auto& data = message.data.value(); + const std::string& sourceBlock = std::get(data.at("sourceBlock"s)); + const std::string& sourcePort = std::get(data.at("sourcePort"s)); + + _graph.removeEdgeBySourcePort(sourceBlock, sourcePort); + + message.endpoint = scheduler::property::kEdgeRemoved; + return message; + } + + std::optional propertyCallbackEmplaceEdge([[maybe_unused]] std::string_view propertyName, Message message) { + assert(propertyName == scheduler::property::kEmplaceEdge); + using namespace std::string_literals; + const auto& data = message.data.value(); + const std::string& sourceBlock = std::get(data.at("sourceBlock"s)); + const std::string& sourcePort = std::get(data.at("sourcePort"s)); + const std::string& destinationBlock = std::get(data.at("destinationBlock"s)); + const std::string& destinationPort = std::get(data.at("destinationPort"s)); + [[maybe_unused]] const std::size_t minBufferSize = std::get(data.at("minBufferSize"s)); + [[maybe_unused]] const std::int32_t weight = std::get(data.at("weight"s)); + const std::string edgeName = std::get(data.at("edgeName"s)); + + _graph.emplaceEdge(sourceBlock, sourcePort, destinationBlock, destinationPort, minBufferSize, weight, edgeName); + + message.endpoint = scheduler::property::kEdgeEmplaced; + return message; + } + + std::optional propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) { + assert(propertyName == scheduler::property::kReplaceBlock); + using namespace std::string_literals; + const auto& data = message.data.value(); + const std::string& uniqueName = std::get(data.at("uniqueName"s)); + const std::string& type = std::get(data.at("type"s)); + const property_map& properties = [&] { + if (auto it = data.find("properties"s); it != data.end()) { + return std::get(it->second); + } else { + return property_map{}; + } + }(); + + auto newBlockRaw = _graph.replaceBlock(uniqueName, type, properties); + + std::optional result = gr::Message{}; + result->endpoint = scheduler::property::kBlockReplaced; + result->data = Graph::serializeBlock(newBlockRaw); + + (*result->data)["replacedBlockUniqueName"s] = uniqueName; + + return result; + } }; template diff --git a/core/test/CMakeLists.txt b/core/test/CMakeLists.txt index aee5d7216..96c34a942 100644 --- a/core/test/CMakeLists.txt +++ b/core/test/CMakeLists.txt @@ -81,6 +81,9 @@ if(GR_ENABLE_BLOCK_REGISTRY AND INTERNAL_ENABLE_BLOCK_PLUGINS) add_ut_test(qa_GraphMessages) target_link_libraries(qa_GraphMessages PUBLIC GrBasicBlocksShared GrTestingBlocksShared) + add_ut_test(qa_SchedulerMessages) + target_link_libraries(qa_SchedulerMessages PUBLIC GrBasicBlocksShared GrTestingBlocksShared) + add_subdirectory(plugins) add_app_test(qa_plugins_test) target_link_libraries(qa_plugins_test PUBLIC GrBasicBlocksShared GrTestingBlocksShared) diff --git a/core/test/message_utils.hpp b/core/test/message_utils.hpp index 95dee5a78..3ef4e8505 100644 --- a/core/test/message_utils.hpp +++ b/core/test/message_utils.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace gr::testing { @@ -102,7 +103,7 @@ inline bool waitForReply(gr::MsgPortIn& fromGraph, std::size_t nReplies = 1UZ, s inline std::string sendAndWaitMessageEmplaceBlock(gr::MsgPortOut& toGraph, gr::MsgPortIn& fromGraph, std::string type, property_map properties, std::string serviceName = "", std::source_location sourceLocation = std::source_location::current()) { expect(eq(getNReplyMessages(fromGraph), 0UZ)) << std::format("Input port has unconsumed messages. Requested at: {}\n", sourceLocation); - sendMessage(toGraph, serviceName, graph::property::kEmplaceBlock /* endpoint */, // + sendMessage(toGraph, serviceName, gr::scheduler::property::kEmplaceBlock /* endpoint */, // {{"type", std::move(type)}, {"properties", std::move(properties)}} /* data */); expect(waitForReply(fromGraph)) << std::format("Reply message not received. Requested at: {}\n", sourceLocation); @@ -118,7 +119,7 @@ inline void sendAndWaitMessageEmplaceEdge(gr::MsgPortOut& toGraph, gr::MsgPortIn expect(eq(getNReplyMessages(fromGraph), 0UZ)) << std::format("Input port has unconsumed messages. Requested at: {}\n", sourceLocation); gr::property_map data = {{"sourceBlock", sourceBlock}, {"sourcePort", sourcePort}, {"destinationBlock", destinationBlock}, {"destinationPort", destinationPort}, // {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; - sendMessage(toGraph, serviceName, graph::property::kEmplaceEdge /* endpoint */, data /* data */); + sendMessage(toGraph, serviceName, gr::scheduler::property::kEmplaceEdge /* endpoint */, data /* data */); expect(waitForReply(fromGraph)) << std::format("Reply message not received. Requested at: {}\n", sourceLocation); expect(eq(getNReplyMessages(fromGraph), 1UZ)) << std::format("No messages available. Requested at: {}\n", sourceLocation); diff --git a/core/test/qa_GraphMessages.cpp b/core/test/qa_GraphMessages.cpp index d377d928c..50298610f 100644 --- a/core/test/qa_GraphMessages.cpp +++ b/core/test/qa_GraphMessages.cpp @@ -17,6 +17,8 @@ using namespace std::string_literals; namespace ut = boost::ut; +// For messages that change graph topology, see qa_SchedulerMessages.cpp instead + // We don't like new, but this will ensure the object is alive // when ut starts running the tests. It runs the tests when // its static objects get destroyed, which means other static @@ -60,7 +62,7 @@ const boost::ut::suite<"Graph Formatter Tests"> graphFormatterTests = [] { }; }; -const boost::ut::suite NonRunningGraphTests = [] { +const boost::ut::suite GraphMessageTests = [] { using namespace std::string_literals; using namespace boost::ut; using namespace gr; @@ -74,208 +76,6 @@ const boost::ut::suite NonRunningGraphTests = [] { std::println(" block: {}", blockName); } - "Block addition tests"_test = [] { - gr::MsgPortOut toGraph; - gr::Graph testGraph(context->loader); - gr::MsgPortIn fromGraph; - - expect(eq(ConnectionResult::SUCCESS, toGraph.connect(testGraph.msgIn))); - expect(eq(ConnectionResult::SUCCESS, testGraph.msgOut.connect(fromGraph))); - - "Add a valid block"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kEmplaceBlock /* endpoint */, // - {{"type", "gr::testing::Copy"}, {"properties", property_map{}}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph, graph::property::kBlockEmplaced); - if (!reply.data.has_value()) { - expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); - } - expect(eq(testGraph.blocks().size(), 1UZ)); - }; - - "Add an invalid block"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kEmplaceBlock /* endpoint */, // - {{"type", "doesnt_exist::multiply"}, {"properties", property_map{}}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); - expect(!reply.data.has_value()); - expect(eq(testGraph.blocks().size(), 1UZ)); - }; - }; - - "Block removal tests"_test = [] { - gr::MsgPortOut toGraph; - gr::Graph testGraph(context->loader); - gr::MsgPortIn fromGraph; - - expect(eq(ConnectionResult::SUCCESS, toGraph.connect(testGraph.msgIn))); - expect(eq(ConnectionResult::SUCCESS, testGraph.msgOut.connect(fromGraph))); - - testGraph.emplaceBlock("gr::testing::Copy", {}); - expect(eq(testGraph.blocks().size(), 1UZ)); - expect(eq(getNReplyMessages(fromGraph), 1UZ)); // emplaceBlock emits message - consumeAllReplyMessages(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); // all messages are consumed - - "Remove a known block"_test = [&] { - auto& temporaryBlock = testGraph.emplaceBlock("gr::testing::Copy", {}); - expect(eq(testGraph.blocks().size(), 2UZ)); - expect(eq(getNReplyMessages(fromGraph), 1UZ)); // emplaceBlock emits message - consumeAllReplyMessages(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); // all messages are consumed - - sendMessage(toGraph, testGraph.unique_name, graph::property::kRemoveBlock /* endpoint */, // - {{"uniqueName", std::string(temporaryBlock.uniqueName())}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - if (!reply.data.has_value()) { - expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); - } - expect(eq(testGraph.blocks().size(), 1UZ)); - }; - - "Remove an unknown block"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kRemoveBlock /* endpoint */, // - {{"uniqueName", "this_block_is_unknown"}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - expect(!reply.data.has_value()); - expect(eq(testGraph.blocks().size(), 1UZ)); - }; - }; - - "Block replacement tests"_test = [] { - gr::MsgPortOut toGraph; - gr::Graph testGraph(context->loader); - gr::MsgPortIn fromGraph; - - expect(eq(ConnectionResult::SUCCESS, toGraph.connect(testGraph.msgIn))); - expect(eq(ConnectionResult::SUCCESS, testGraph.msgOut.connect(fromGraph))); - - auto& block = testGraph.emplaceBlock("gr::testing::Copy", {}); - expect(eq(testGraph.blocks().size(), 1UZ)); - expect(eq(getNReplyMessages(fromGraph), 1UZ)); // emplaceBlock emits message - consumeAllReplyMessages(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); // all messages are consumed - - "Replace a known block"_test = [&] { - auto& temporaryBlock = testGraph.emplaceBlock("gr::testing::Copy", {}); - expect(eq(testGraph.blocks().size(), 2UZ)); - expect(eq(getNReplyMessages(fromGraph), 1UZ)); // emplaceBlock emits message - consumeAllReplyMessages(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); // all messages are consumed - - sendMessage(toGraph, testGraph.unique_name, graph::property::kReplaceBlock /* endpoint */, // - {{"uniqueName", std::string(temporaryBlock.uniqueName())}, // - {"type", "gr::testing::Copy"}, {"properties", property_map{}}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - if (!reply.data.has_value()) { - expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); - } - }; - - "Replace an unknown block"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kReplaceBlock /* endpoint */, // - {{"uniqueName", "this_block_is_unknown"}, // - {"type", "gr::testing::Copy"}, {"properties", property_map{}}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - - expect(!reply.data.has_value()); - }; - - "Replace with an unknown block"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kReplaceBlock /* endpoint */, // - {{"uniqueName", std::string(block.uniqueName())}, // - {"type", "doesnt_exist::multiply"}, {"properties", property_map{}}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - - expect(!reply.data.has_value()); - }; - }; - - "Edge addition tests"_test = [&] { - gr::MsgPortOut toGraph; - gr::Graph testGraph(context->loader); - gr::MsgPortIn fromGraph; - - expect(eq(ConnectionResult::SUCCESS, toGraph.connect(testGraph.msgIn))); - expect(eq(ConnectionResult::SUCCESS, testGraph.msgOut.connect(fromGraph))); - - auto& blockOut = testGraph.emplaceBlock("gr::testing::Copy", {}); - auto& blockIn = testGraph.emplaceBlock("gr::testing::Copy", {}); - auto& blockWrongType = testGraph.emplaceBlock("gr::testing::Copy", {}); - - expect(eq(getNReplyMessages(fromGraph), 3UZ)); // emplaceBlock emits message - consumeAllReplyMessages(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); // all messages are consumed - - "Add an edge"_test = [&] { - property_map data = {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "out"}, // - {"destinationBlock", std::string(blockIn.uniqueName())}, {"destinationPort", "in"}, // - {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; - - sendMessage(toGraph, testGraph.unique_name, graph::property::kEmplaceEdge /* endpoint */, data /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - if (!reply.data.has_value()) { - expect(false) << std::format("edge not being placed - error: {}", reply.data.error()); - } - }; - - "Fail to add an edge because source port is invalid"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kEmplaceEdge /* endpoint */, // - {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "OUTPUT"}, // - {"destinationBlock", std::string(blockIn.uniqueName())}, {"destinationPort", "in"}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - expect(!reply.data.has_value()); - }; - - "Fail to add an edge because destination port is invalid"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kEmplaceEdge /* endpoint */, // - {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "in"}, // - {"destinationBlock", std::string(blockIn.uniqueName())}, {"destinationPort", "INPUT"}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - expect(!reply.data.has_value()); - }; - - "Fail to add an edge because ports are not compatible"_test = [&] { - sendMessage(toGraph, testGraph.unique_name, graph::property::kEmplaceEdge /* endpoint */, // - {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "out"}, // - {"destinationBlock", std::string(blockWrongType.uniqueName())}, {"destinationPort", "in"}} /* data */); - expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - expect(!reply.data.has_value()); - }; - }; - "BlockRegistry tests"_test = [] { gr::MsgPortOut toGraph; gr::Graph testGraph(context->loader); @@ -311,112 +111,4 @@ const boost::ut::suite NonRunningGraphTests = [] { }; }; -const boost::ut::suite RunningGraphTests = [] { - using namespace std::string_literals; - using namespace boost::ut; - using namespace gr; - using namespace gr::testing; - using enum gr::message::Command; - - gr::scheduler::Simple scheduler{gr::Graph(context->loader)}; - - auto& source = scheduler.graph().emplaceBlock>(); - auto& sink = scheduler.graph().emplaceBlock>(); - expect(eq(ConnectionResult::SUCCESS, scheduler.graph().connect<"out">(source).to<"in">(sink))); - expect(eq(scheduler.graph().edges().size(), 1UZ)) << "edge registered with connect"; - - gr::MsgPortOut toGraph; - gr::MsgPortIn fromGraph; - expect(eq(ConnectionResult::SUCCESS, toGraph.connect(scheduler.msgIn))); - expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromGraph))); - - std::expected schedulerRet; - auto runScheduler = [&scheduler, &schedulerRet] { schedulerRet = scheduler.runAndWait(); }; - - std::thread schedulerThread1(runScheduler); - - expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; - expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler thread up and running"; - expect(eq(scheduler.graph().edges().size(), 1UZ)) << "added one edge"; - - expect(awaitCondition(1s, [&sink] { return sink.count >= 10U; })) << "sink received enough data"; - std::println("executed basic graph"); - - // Adding a few blocks - auto multiply1 = sendAndWaitMessageEmplaceBlock(toGraph, fromGraph, "gr::testing::Copy"s, property_map{}); - auto multiply2 = sendAndWaitMessageEmplaceBlock(toGraph, fromGraph, "gr::testing::Copy"s, property_map{}); - scheduler.processScheduledMessages(); - - for (const auto& block : scheduler.graph().blocks()) { - std::println("block in list: {} - state() : {}", block->name(), magic_enum::enum_name(block->state())); - } - expect(eq(scheduler.graph().blocks().size(), 4UZ)) << "should contain sink->multiply1->multiply2->sink"; - - sendAndWaitMessageEmplaceEdge(toGraph, fromGraph, source.unique_name, "out", multiply1, "in"); - sendAndWaitMessageEmplaceEdge(toGraph, fromGraph, multiply1, "out", multiply2, "in"); - sendAndWaitMessageEmplaceEdge(toGraph, fromGraph, multiply2, "out", sink.unique_name, "in"); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); - scheduler.processScheduledMessages(); - - // Get the whole graph - { - sendMessage(toGraph, "" /* serviceName */, graph::property::kGraphInspect /* endpoint */, property_map{} /* data */); - if (!waitForReply(fromGraph)) { - expect(false) << "Reply message not received for kGraphInspect."; - } - - expect(eq(getNReplyMessages(fromGraph), 1UZ)); - const Message reply = getAndConsumeFirstReplyMessage(fromGraph); - expect(eq(getNReplyMessages(fromGraph), 0UZ)); - if (!reply.data.has_value()) { - expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); - } - - const auto& data = reply.data.value(); - const auto& children = std::get(data.at("children"s)); - expect(eq(children.size(), 4UZ)); - - const auto& edges = std::get(data.at("edges"s)); - expect(eq(edges.size(), 4UZ)); - } - scheduler.processScheduledMessages(); - - // Stopping scheduler - scheduler.requestStop(); - schedulerThread1.join(); - if (!schedulerRet.has_value()) { - expect(false) << std::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet.error()); - } - - // return to initial state - expect(scheduler.changeStateTo(lifecycle::State::INITIALISED).has_value()) << "could switch to INITIALISED?"; - expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::INITIALISED; })) << "scheduler INITIALISED w/ timeout"; - expect(scheduler.state() == lifecycle::State::INITIALISED) << std::format("scheduler INITIALISED - actual: {}\n", magic_enum::enum_name(scheduler.state())); - - std::thread schedulerThread2(runScheduler); - expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; - expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler thread up and running"; - - for (const auto& edge : scheduler.graph().edges()) { - std::println("edge in list({}): {}", scheduler.graph().edges().size(), edge); - } - expect(eq(scheduler.graph().edges().size(), 4UZ)) << "added three new edges, one previously registered with connect"; - - // FIXME: edge->connection is not performed - // expect(awaitCondition(1s, [&sink] { - // std::this_thread::sleep_for(100ms); - // std::println("sink has received {} samples - parents: {}", sink.count, sink.in.buffer().streamBuffer.n_writers()); - // return sink.count >= 10U; - // })) << "sink received enough data"; - - scheduler.requestStop(); - - std::print("Counting sink counted to {}\n", sink.count); - - schedulerThread2.join(); - if (!schedulerRet.has_value()) { - expect(false) << std::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet.error()); - } -}; - int main() { /* tests are statically executed */ } diff --git a/core/test/qa_HierBlock.cpp b/core/test/qa_HierBlock.cpp index 08e22e156..2262865e1 100644 --- a/core/test/qa_HierBlock.cpp +++ b/core/test/qa_HierBlock.cpp @@ -98,8 +98,9 @@ const boost::ut::suite ExportPortsTests_ = [] { expect(eq(getNReplyMessages(fromScheduler), 0UZ)); // Make connections - sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, source.unique_name, "out", std::string(subGraph.uniqueName()), "in", graph.unique_name); - sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, std::string(subGraph.uniqueName()), "out", sink.unique_name, "in", graph.unique_name); + sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, source.unique_name, "out", std::string(subGraph.uniqueName()), "in", scheduler.unique_name); + sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, std::string(subGraph.uniqueName()), "out", sink.unique_name, "in", scheduler.unique_name); + expect(eq(getNReplyMessages(fromScheduler), 0UZ)); // Get the whole graph diff --git a/core/test/qa_SchedulerMessages.cpp b/core/test/qa_SchedulerMessages.cpp new file mode 100644 index 000000000..cc6c4d8d3 --- /dev/null +++ b/core/test/qa_SchedulerMessages.cpp @@ -0,0 +1,386 @@ +#include + +#include +#include + +#include +#include + +#include "TestBlockRegistryContext.hpp" + +#include "message_utils.hpp" + +using namespace std::chrono_literals; +using namespace std::string_literals; + +namespace ut = boost::ut; + +// We don't like new, but this will ensure the object is alive +// when ut starts running the tests. It runs the tests when +// its static objects get destroyed, which means other static +// objects might have been destroyed before that. +TestContext* context = new TestContext(paths{}, // plugin paths + gr::blocklib::initGrBasicBlocks, // + gr::blocklib::initGrTestingBlocks); + +class TestScheduler { +public: + TestScheduler(gr::Graph graph) : scheduler_(std::move(graph)) { + using namespace gr::testing; + + scheduler_.graph().emplaceBlock>(); + scheduler_.graph().emplaceBlock>(); + + expect(eq(ConnectionResult::SUCCESS, toScheduler.connect(scheduler_.msgIn))); + expect(eq(ConnectionResult::SUCCESS, scheduler_.msgOut.connect(fromScheduler))); + + schedulerThread_ = std::thread([this] { schedulerRet_ = scheduler_.runAndWait(); }); + + using namespace boost::ut; + // Wait for the scheduler to start running + expect(gr::testing::awaitCondition(1s, [this] { return scheduler_.state() == gr::lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + expect(scheduler_.state() == gr::lifecycle::State::RUNNING) << "scheduler thread up and running"; + } + + ~TestScheduler() { + using namespace boost::ut; + scheduler_.requestStop(); + + if (schedulerThread_.joinable()) { + schedulerThread_.join(); + } + + if (!schedulerRet_.has_value()) { + expect(false) << std::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet_.error()); + } + } + + TestScheduler(const TestScheduler&) = delete; + TestScheduler& operator=(const TestScheduler&) = delete; + TestScheduler(TestScheduler&&) = delete; + TestScheduler& operator=(TestScheduler&&) = delete; + + auto& scheduler() { return scheduler_; } + auto& scheduler() const { return scheduler_; } + auto& msgIn() { return scheduler_.msgIn; } + auto& msgOut() { return scheduler_.msgOut; } + + auto& graph() { return scheduler_.graph(); } + + gr::scheduler::Simple scheduler_; + + gr::MsgPortOut toScheduler; + gr::MsgPortIn fromScheduler; + +private: + std::expected schedulerRet_; + std::thread schedulerThread_; +}; + +const boost::ut::suite TopologyGraphTests = [] { + using namespace std::string_literals; + using namespace boost::ut; + using namespace gr; + using namespace gr::testing; + using namespace gr::test; + using enum gr::message::Command; + + expect(fatal(gt(context->registry.keys().size(), 0UZ))) << "didn't register any blocks"; + + "Block addition tests"_test = [] { + TestScheduler scheduler(gr::Graph(context->loader)); + + "Add a valid block"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kEmplaceBlock /* endpoint */, // + {{"type", "gr::testing::Copy"}, {"properties", property_map{}}} /* data */); + + waitForReply(scheduler.fromScheduler); + // expect(nothrow([&] { scheduler.scheduler_.processScheduledMessages(); })) << "manually execute processing of messages"; + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler, scheduler::property::kBlockEmplaced); + if (!reply.data.has_value()) { + expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); + } + expect(eq(scheduler.graph().blocks().size(), 3UZ)); + }; + + "Add an invalid block"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kEmplaceBlock /* endpoint */, // + {{"type", "doesnt_exist::multiply"}, {"properties", property_map{}}} /* data */); + + waitForReply(scheduler.fromScheduler); + + expect(nothrow([&] { scheduler.scheduler_.processScheduledMessages(); })) << "manually execute processing of messages"; + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + expect(eq(getNReplyMessages(scheduler.fromScheduler), 0UZ)); + expect(!reply.data.has_value()); + expect(eq(scheduler.graph().blocks().size(), 3UZ)); + }; + }; + + "Block removal tests"_test = [] { + TestScheduler scheduler(gr::Graph(context->loader)); + + auto& testGraph = scheduler.graph(); + + testGraph.emplaceBlock("gr::testing::Copy", {}); + expect(eq(testGraph.blocks().size(), 3UZ)); + // expect(eq(getNReplyMessages(fromScheduler), 1UZ)); // emplaceBlock emits message + consumeAllReplyMessages(scheduler.fromScheduler); + expect(eq(getNReplyMessages(scheduler.fromScheduler), 0UZ)); // all messages are consumed + + "Remove a known block"_test = [&] { + auto& temporaryBlock = testGraph.emplaceBlock("gr::testing::Copy", {}); + expect(eq(testGraph.blocks().size(), 4UZ)); + // expect(eq(getNReplyMessages(fromScheduler), 1UZ)); // emplaceBlock emits message + consumeAllReplyMessages(scheduler.fromScheduler); + expect(eq(getNReplyMessages(scheduler.fromScheduler), 0UZ)); // all messages are consumed + + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kRemoveBlock /* endpoint */, // + {{"uniqueName", std::string(temporaryBlock.uniqueName())}} /* data */); + + waitForReply(scheduler.fromScheduler); + // expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + if (!reply.data.has_value()) { + expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); + } + expect(eq(testGraph.blocks().size(), 3UZ)); + }; + + "Remove an unknown block"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kRemoveBlock /* endpoint */, // + {{"uniqueName", "this_block_is_unknown"}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + expect(!reply.data.has_value()); + expect(eq(testGraph.blocks().size(), 3UZ)); + }; + }; + + "Block replacement tests"_test = [] { + gr::Graph testGraph(context->loader); + + auto& block = testGraph.emplaceBlock("gr::testing::Copy", {}); + expect(eq(testGraph.blocks().size(), 1UZ)); + + TestScheduler scheduler(std::move(testGraph)); + + "Replace a known block"_test = [&] { + auto& temporaryBlock = scheduler.graph().emplaceBlock("gr::testing::Copy", {}); + expect(eq(scheduler.graph().blocks().size(), 4UZ)); + + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kReplaceBlock /* endpoint */, // + {{"uniqueName", std::string(temporaryBlock.uniqueName())}, // + {"type", "gr::testing::Copy"}, {"properties", property_map{}}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + if (!reply.data.has_value()) { + expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); + } + }; + + "Replace an unknown block"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kReplaceBlock /* endpoint */, // + {{"uniqueName", "this_block_is_unknown"}, // + {"type", "gr::testing::Copy"}, {"properties", property_map{}}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + + expect(!reply.data.has_value()); + }; + + "Replace with an unknown block"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kReplaceBlock /* endpoint */, // + {{"uniqueName", std::string(block.uniqueName())}, // + {"type", "doesnt_exist::multiply"}, {"properties", property_map{}}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + + expect(!reply.data.has_value()); + }; + }; + + "Edge addition tests"_test = [&] { + gr::Graph testGraph(context->loader); + + auto& blockOut = testGraph.emplaceBlock("gr::testing::Copy", {}); + auto& blockIn = testGraph.emplaceBlock("gr::testing::Copy", {}); + auto& blockWrongType = testGraph.emplaceBlock("gr::testing::Copy", {}); + + TestScheduler scheduler(std::move(testGraph)); + + "Add an edge"_test = [&] { + property_map data = {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "out"}, // + {"destinationBlock", std::string(blockIn.uniqueName())}, {"destinationPort", "in"}, // + {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; + + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kEmplaceEdge /* endpoint */, data /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + if (!reply.data.has_value()) { + expect(false) << std::format("edge not being placed - error: {}", reply.data.error()); + } + }; + + "Fail to add an edge because source port is invalid"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kEmplaceEdge /* endpoint */, // + {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "OUTPUT"}, // + {"destinationBlock", std::string(blockIn.uniqueName())}, {"destinationPort", "in"}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + expect(!reply.data.has_value()); + }; + + "Fail to add an edge because destination port is invalid"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kEmplaceEdge /* endpoint */, // + {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "in"}, // + {"destinationBlock", std::string(blockIn.uniqueName())}, {"destinationPort", "INPUT"}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + expect(!reply.data.has_value()); + }; + + "Fail to add an edge because ports are not compatible"_test = [&] { + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kEmplaceEdge /* endpoint */, // + {{"sourceBlock", std::string(blockOut.uniqueName())}, {"sourcePort", "out"}, // + {"destinationBlock", std::string(blockWrongType.uniqueName())}, {"destinationPort", "in"}} /* data */); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + expect(!reply.data.has_value()); + }; + }; +}; + +/// old tests, from the time graph handled messages. They're still good +const boost::ut::suite MoreTopologyGraphTests = [] { + using namespace std::string_literals; + using namespace boost::ut; + using namespace gr; + using namespace gr::testing; + using enum gr::message::Command; + + gr::scheduler::Simple scheduler{gr::Graph(context->loader)}; + + auto& source = scheduler.graph().emplaceBlock>(); + auto& sink = scheduler.graph().emplaceBlock>(); + expect(eq(ConnectionResult::SUCCESS, scheduler.graph().connect<"out">(source).to<"in">(sink))); + expect(eq(scheduler.graph().edges().size(), 1UZ)) << "edge registered with connect"; + + gr::MsgPortOut toScheduler; + gr::MsgPortIn fromScheduler; + expect(eq(ConnectionResult::SUCCESS, toScheduler.connect(scheduler.msgIn))); + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromScheduler))); + + std::expected schedulerRet; + auto runScheduler = [&scheduler, &schedulerRet] { schedulerRet = scheduler.runAndWait(); }; + + std::thread schedulerThread1(runScheduler); + + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler thread up and running"; + expect(eq(scheduler.graph().edges().size(), 1UZ)) << "added one edge"; + + expect(awaitCondition(1s, [&sink] { return sink.count >= 10U; })) << "sink received enough data"; + std::println("executed basic graph"); + + // Adding a few blocks + auto multiply1 = sendAndWaitMessageEmplaceBlock(toScheduler, fromScheduler, "gr::testing::Copy"s, property_map{}); + auto multiply2 = sendAndWaitMessageEmplaceBlock(toScheduler, fromScheduler, "gr::testing::Copy"s, property_map{}); + scheduler.processScheduledMessages(); + + for (const auto& block : scheduler.graph().blocks()) { + std::println("block in list: {} - state() : {}", block->name(), magic_enum::enum_name(block->state())); + } + expect(eq(scheduler.graph().blocks().size(), 4UZ)) << "should contain sink->multiply1->multiply2->sink"; + + sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, source.unique_name, "out", multiply1, "in"); + sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, multiply1, "out", multiply2, "in"); + sendAndWaitMessageEmplaceEdge(toScheduler, fromScheduler, multiply2, "out", sink.unique_name, "in"); + expect(eq(getNReplyMessages(fromScheduler), 0UZ)); + scheduler.processScheduledMessages(); + + // Get the whole graph + { + sendMessage(toScheduler, "" /* serviceName */, graph::property::kGraphInspect /* endpoint */, property_map{} /* data */); + if (!waitForReply(fromScheduler)) { + expect(false) << "Reply message not received for kGraphInspect."; + } + + expect(eq(getNReplyMessages(fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(fromScheduler); + expect(eq(getNReplyMessages(fromScheduler), 0UZ)); + if (!reply.data.has_value()) { + expect(false) << std::format("reply.data has no value:{}\n", reply.data.error()); + } + + const auto& data = reply.data.value(); + const auto& children = std::get(data.at("children"s)); + expect(eq(children.size(), 4UZ)); + + const auto& edges = std::get(data.at("edges"s)); + expect(eq(edges.size(), 4UZ)); + } + scheduler.processScheduledMessages(); + + // Stopping scheduler + scheduler.requestStop(); + schedulerThread1.join(); + if (!schedulerRet.has_value()) { + expect(false) << std::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet.error()); + } + + // return to initial state + expect(scheduler.changeStateTo(lifecycle::State::INITIALISED).has_value()) << "could switch to INITIALISED?"; + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::INITIALISED; })) << "scheduler INITIALISED w/ timeout"; + expect(scheduler.state() == lifecycle::State::INITIALISED) << std::format("scheduler INITIALISED - actual: {}\n", magic_enum::enum_name(scheduler.state())); + + std::thread schedulerThread2(runScheduler); + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler thread up and running"; + + for (const auto& edge : scheduler.graph().edges()) { + std::println("edge in list({}): {}", scheduler.graph().edges().size(), edge); + } + expect(eq(scheduler.graph().edges().size(), 4UZ)) << "added three new edges, one previously registered with connect"; + + // FIXME: edge->connection is not performed + // expect(awaitCondition(1s, [&sink] { + // std::this_thread::sleep_for(100ms); + // std::println("sink has received {} samples - parents: {}", sink.count, sink.in.buffer().streamBuffer.n_writers()); + // return sink.count >= 10U; + // })) << "sink received enough data"; + + scheduler.requestStop(); + + std::print("Counting sink counted to {}\n", sink.count); + + schedulerThread2.join(); + if (!schedulerRet.has_value()) { + expect(false) << std::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet.error()); + } +}; + +int main() { /* tests are statically executed */ } From 5bfb21ffb9cdc3e4ebd6dd6f1c263afbbd898c42 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Mon, 12 May 2025 12:03:36 +0100 Subject: [PATCH 2/8] Introduce zombie list for stopping blocks This allows to remove blocks without having to stop the scheduler. Running blocks can't transition directly to stopped state, so we move them into a "zombie list". There's a maintenance period where we delete blocks that have stopped. Furthermore, deleted blocks are now removed from each thread's local block list. Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Block.hpp | 1 + core/include/gnuradio-4.0/Graph.hpp | 16 ++-- core/include/gnuradio-4.0/Scheduler.hpp | 114 ++++++++++++++++++++++-- 3 files changed, 119 insertions(+), 12 deletions(-) diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index a71238187..e76880e87 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -498,6 +498,7 @@ class Block : public lifecycle::StateMachine { ~Block() { // NOSONAR -- need to request the (potentially) running ioThread to stop if (lifecycle::isActive(this->state())) { + // Only happens in artificial cases likes qa_Block test. In practice blocks stay in zombie list if active emitErrorMessageIfAny("~Block()", this->changeStateTo(lifecycle::State::REQUESTED_STOP)); } if constexpr (blockingIO) { diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index e9ce199b5..c39757c49 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -493,7 +493,7 @@ class Graph : public gr::Block { return {reply}; } - void removeBlockByName(std::string_view uniqueName) { + std::unique_ptr removeBlockByName(std::string_view uniqueName) { auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; }); if (it == _blocks.end()) { @@ -504,10 +504,13 @@ class Graph : public gr::Block { return std::addressof(edge.sourceBlock()) == it->get() || std::addressof(edge.destinationBlock()) == it->get(); }); + std::unique_ptr removedBlock = std::move(*it); _blocks.erase(it); + + return removedBlock; } - gr::BlockModel* replaceBlock(const std::string& uniqueName, const std::string& type, const property_map& properties) { + std::pair, BlockModel*> replaceBlock(const std::string& uniqueName, const std::string& type, const property_map& properties) { auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; }); if (it == _blocks.end()) { throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name)); @@ -521,19 +524,20 @@ class Graph : public gr::Block { addBlock(std::move(newBlock)); - BlockModel* oldBlock = it->get(); for (auto& edge : _edges) { - if (edge._sourceBlock == oldBlock) { + if (edge._sourceBlock == it->get()) { edge._sourceBlock = newBlockRaw; } - if (edge._destinationBlock == oldBlock) { + if (edge._destinationBlock == it->get()) { edge._destinationBlock = newBlockRaw; } } + + std::unique_ptr oldBlock = std::move(*it); _blocks.erase(it); - return newBlockRaw; + return {std::move(oldBlock), newBlockRaw}; } void emplaceEdge(std::string_view sourceBlock, std::string sourcePort, std::string_view destinationBlock, // diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index 12befb6c1..ed0d18850 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -57,6 +57,9 @@ class SchedulerBase : public Block { std::recursive_mutex _jobListsMutex; // only used when modifying and copying the graph->local job list JobLists _jobLists = std::make_shared>>(); + std::mutex _zombieBlocksMutex; + std::vector> _zombieBlocks; + MsgPortOutForChildren _toChildMessagePort; MsgPortInFromChildren _fromChildMessagePort; std::vector _pendingMessagesToChildren; @@ -356,6 +359,11 @@ class SchedulerBase : public Block { if (runnerID == 0UZ || _nRunningJobs.load(std::memory_order_acquire) == 0UZ) { this->processScheduledMessages(); // execute the scheduler- and Graph-specific message handler only once globally } + + // Zombies are cleaned per-thread, as we remove from the localBlockList as well. + // Cleaning zombies has low priority, so uses process_stream_to_message_ratio (a different ratio could be introduced) + cleanupZombieBlocks(localBlockList); + std::ranges::for_each(localBlockList, [](auto& block) { block->processScheduledMessages(); }); activeState = this->state(); msgToCount++; @@ -376,10 +384,6 @@ class SchedulerBase : public Block { break; } } else if (activeState == lifecycle::State::PAUSED) { - if (_graph.hasTopologyChanged()) { - // TODO: update localBlockList topology if needed - _graph.ackTopologyChange(); - } std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms)); msgToCount = 0UZ; } else { // other states @@ -416,6 +420,7 @@ class SchedulerBase : public Block { this->emitErrorMessageIfAny("forEachBlock -> stop() -> LifecycleState", block->changeStateTo(lifecycle::State::STOPPED)); } }); + this->emitErrorMessageIfAny("stop() -> LifecycleState ->STOPPED", this->changeStateTo(lifecycle::State::STOPPED)); this->emitErrorMessageIfAny("stop() -> LifecycleState ->IDLE", this->changeStateTo(lifecycle::State::IDLE)); } @@ -465,7 +470,8 @@ class SchedulerBase : public Block { const auto& data = message.data.value(); const std::string& uniqueName = std::get(data.at("uniqueName"s)); - _graph.removeBlockByName(uniqueName); + auto removedBlock = _graph.removeBlockByName(uniqueName); + makeZombie(std::move(removedBlock)); message.endpoint = scheduler::property::kBlockRemoved; return {message}; @@ -502,6 +508,101 @@ class SchedulerBase : public Block { return message; } + /* + Zombie Tutorial: + + Blocks can't be deleted unless stopped, but since stopping can take time (async) we move such blocks + to the "zombie list" and disconnect them immediately from the graph. This allows them to stop and be deleted + safely. + + Periodically, we call cleanupZombieBlocks(), which iterates the zombie list and deletes the blocks that are now stopped. + + cleanupZombieBlocks() is called *per-thread*, since we also need to update the localBlockList, i.e.: removing dangling block pointers + from the localBlockList. + + We also update the _jobLists member variable, but probably that member can be removed, seems unneeded and only used so unit-tests can + query it. + */ + void cleanupZombieBlocks(std::vector& localBlockList) { + if (localBlockList.empty()) { + return; + } + + std::lock_guard guard(_zombieBlocksMutex); + + auto it = _zombieBlocks.begin(); + + while (it != _zombieBlocks.end()) { + auto localBlockIt = std::find(localBlockList.begin(), localBlockList.end(), it->get()); + if (localBlockIt == localBlockList.end()) { + // we only care about the blocks local to our thread. + ++it; + continue; + } + + bool shouldDelete = false; + + switch ((*it)->state()) { + case lifecycle::State::IDLE: + case lifecycle::State::STOPPED: + case lifecycle::State::INITIALISED: + // This block can be deleted immediately + shouldDelete = true; + break; + case lifecycle::State::ERROR: + // Delete as well. (Separate case block, as better ideas welcome) + shouldDelete = true; + break; + case lifecycle::State::REQUESTED_STOP: + // This block will be deleted later + break; + case lifecycle::State::REQUESTED_PAUSE: + // This block will be deleted later + // There's no transition from REQUESTED_PAUSE to REQUESTED_STOP + // Will be moved to REQUESTED_STOP as soon as it's possible + break; + case lifecycle::State::PAUSED: + // This zombie was in REQUESTED_PAUSE and now finally in PAUSED. Can be stopped now. + // Will be deleted in a next zombie maintenance period + this->emitErrorMessageIfAny("cleanupZombieBlocks", (*it)->changeStateTo(lifecycle::State::REQUESTED_STOP)); + break; + case lifecycle::State::RUNNING: assert(false && "Doesn't happen: zombie blocks are never running"); break; + } + + if (shouldDelete) { + localBlockList.erase(localBlockIt); + + BlockModel* zombieRaw = it->get(); + it = _zombieBlocks.erase(it); // ~Block() runs here + + // We need to remove zombieRaw from jobLists as well, in case Scheduler ever goes to INITIALIZED + // again. + // TODO: I'd argue we should remove _jobLists to minimize having to maintain state. Instead, a job list can be + // calculated in start(). + std::lock_guard lock(_jobListsMutex); + for (auto& jobList : *this->_jobLists) { + auto job_it = std::remove(jobList.begin(), jobList.end(), zombieRaw); + if (job_it != jobList.end()) { + jobList.erase(job_it, jobList.end()); + break; + } + } + + } else { + ++it; + } + } + } + + void makeZombie(std::unique_ptr block) { + if (block->state() == lifecycle::State::PAUSED || block->state() == lifecycle::State::RUNNING) { + this->emitErrorMessageIfAny("makeZombie", block->changeStateTo(lifecycle::State::REQUESTED_STOP)); + } + + std::lock_guard guard(_zombieBlocksMutex); + _zombieBlocks.push_back(std::move(block)); + } + std::optional propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) { assert(propertyName == scheduler::property::kReplaceBlock); using namespace std::string_literals; @@ -516,7 +617,8 @@ class SchedulerBase : public Block { } }(); - auto newBlockRaw = _graph.replaceBlock(uniqueName, type, properties); + auto [oldBlock, newBlockRaw] = _graph.replaceBlock(uniqueName, type, properties); + makeZombie(std::move(oldBlock)); std::optional result = gr::Message{}; result->endpoint = scheduler::property::kBlockReplaced; From 109ab478f1ed5a4670b02e492723d38e84974af6 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Wed, 14 May 2025 15:24:56 +0100 Subject: [PATCH 3/8] Remove unused "topology changed" mechanism This was never used. It was set but never read and the job list was never updated. With the "zombie list" there's no need for it anymore. Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Graph.hpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index c39757c49..6cb08ce47 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -188,7 +188,6 @@ class Graph : public gr::Block { private: std::shared_ptr _progress = std::make_shared(); std::shared_ptr _ioThreadPool = std::make_shared("graph_thread_pool", gr::thread_pool::TaskType::IO_BOUND, 2UZ, std::numeric_limits::max()); - std::atomic_bool _topologyChanged{false}; std::vector _edges; std::vector> _blocks; @@ -338,17 +337,12 @@ class Graph : public gr::Block { } _progress = std::move(other._progress); _ioThreadPool = std::move(other._ioThreadPool); - _topologyChanged.store(other._topologyChanged.load(std::memory_order_acquire), std::memory_order_release); - _edges = std::move(other._edges); - _blocks = std::move(other._blocks); + _edges = std::move(other._edges); + _blocks = std::move(other._blocks); return *this; } - void setTopologyChanged() noexcept { _topologyChanged.store(true, std::memory_order_release); } - [[nodiscard]] bool hasTopologyChanged() const noexcept { return _topologyChanged; } - void ackTopologyChange() noexcept { _topologyChanged.store(false, std::memory_order_release); } - [[nodiscard]] std::span> blocks() noexcept { return {_blocks}; } [[nodiscard]] std::span edges() noexcept { return {_edges}; } From 26b4636d5a64ea69a1604757f6a5010326d90a83 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Tue, 20 May 2025 15:47:04 +0100 Subject: [PATCH 4/8] Add scheduler message to retrieve the graph as YAML Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Scheduler.hpp | 23 +++++++++++++++++ core/test/qa_SchedulerMessages.cpp | 33 +++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index ed0d18850..9cfde0aa8 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,8 @@ inline static const char* kBlockRemoved = "BlockRemoved"; inline static const char* kBlockReplaced = "BlockReplaced"; inline static const char* kEdgeEmplaced = "EdgeEmplaced"; inline static const char* kEdgeRemoved = "EdgeRemoved"; + +inline static const char* kGraphGRC = "GraphGRC"; } // namespace property enum class ExecutionPolicy { @@ -110,6 +113,7 @@ class SchedulerBase : public Block { this->propertyCallbacks[scheduler::property::kRemoveEdge] = std::mem_fn(&SchedulerBase::propertyCallbackRemoveEdge); this->propertyCallbacks[scheduler::property::kEmplaceEdge] = std::mem_fn(&SchedulerBase::propertyCallbackEmplaceEdge); this->propertyCallbacks[scheduler::property::kReplaceBlock] = std::mem_fn(&SchedulerBase::propertyCallbackReplaceBlock); + this->propertyCallbacks[scheduler::property::kGraphGRC] = std::mem_fn(&SchedulerBase::propertyCallbackGraphGRC); } ~SchedulerBase() { @@ -603,6 +607,25 @@ class SchedulerBase : public Block { _zombieBlocks.push_back(std::move(block)); } + std::optional propertyCallbackGraphGRC([[maybe_unused]] std::string_view propertyName, Message message) { + assert(propertyName == scheduler::property::kGraphGRC); + + auto& pluginLoader = gr::globalPluginLoader(); + if (message.cmd == message::Command::Get) { + message.data = property_map{{"value", gr::saveGrc(pluginLoader, _graph)}}; + } else if (message.cmd == message::Command::Set) { + // const auto& data = message.data.value(); + // auto yamlContent = std::get(data.at("value"s)); + // Graph = gr::loadGrc(pluginLoader, yamlContent); + // We need to stop the scheduler first + throw gr::exception(std::format("Not implemented {}", message.cmd)); + } else { + throw gr::exception(std::format("Unexpected command type {}", message.cmd)); + } + + return message; + } + std::optional propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) { assert(propertyName == scheduler::property::kReplaceBlock); using namespace std::string_literals; diff --git a/core/test/qa_SchedulerMessages.cpp b/core/test/qa_SchedulerMessages.cpp index cc6c4d8d3..914f06597 100644 --- a/core/test/qa_SchedulerMessages.cpp +++ b/core/test/qa_SchedulerMessages.cpp @@ -272,6 +272,39 @@ const boost::ut::suite TopologyGraphTests = [] { expect(!reply.data.has_value()); }; }; + + "Get GRC Yaml tests"_test = [] { + gr::Graph testGraph(context->loader); + testGraph.emplaceBlock("gr::testing::Copy", {}); + testGraph.emplaceBlock("gr::testing::Copy", {}); + + TestScheduler scheduler(std::move(testGraph)); + + sendMessage(scheduler.toScheduler, scheduler.scheduler_.unique_name, scheduler::property::kGraphGRC, {}); + waitForReply(scheduler.fromScheduler); + + expect(eq(getNReplyMessages(scheduler.fromScheduler), 1UZ)); + const Message reply = getAndConsumeFirstReplyMessage(scheduler.fromScheduler); + + expect(reply.data.has_value()) << "Reply should contain data"; + if (reply.data.has_value()) { + const auto& data = reply.data.value(); + expect(data.contains("value")) << "Reply should contain 'value' field"; + const auto& yaml = std::get(data.at("value")); + expect(!yaml.empty()) << "YAML string should not be empty"; + std::println("YAML content:\n{}", yaml); + + // verify well formed by loading from yaml + auto graphFromYaml = gr::loadGrc(context->loader, yaml); + expect(eq(graphFromYaml.blocks().size(), 4UZ)) << std::format("Expected 4 blocks in loaded graph, got {} blocks", graphFromYaml.blocks().size()); + + // "Set GRC YAML"_test = [&] { + // sendMessage(toGraph, scheduler.scheduler_.unique_name, scheduler::property::kGraphGRC, {{"value", yaml}}); + // expect(nothrow([&] { testGraph.processScheduledMessages(); })) << "manually execute processing of messages"; + // expect(eq(testGraph.blocks().size(), 2UZ)) << "Expected 2 blocks after reloading GRC"; + // }; + } + }; }; /// old tests, from the time graph handled messages. They're still good From e3730171414d45b1187e02c5809ae2e6940eb228 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Tue, 27 May 2025 11:25:49 +0100 Subject: [PATCH 5/8] Support adding blocks into a running scheduler During the maintenance period, when we cleanup zombies, we also check if a thread should adopt blocks that were added meanwhile Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Scheduler.hpp | 65 +++++++++++++++++++++++++ core/test/qa_Scheduler.cpp | 61 +++++++++++++++++++++++ 2 files changed, 126 insertions(+) diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index 9cfde0aa8..ae6df5d78 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -63,6 +63,11 @@ class SchedulerBase : public Block { std::mutex _zombieBlocksMutex; std::vector> _zombieBlocks; + // for blocks that were added while scheduler was running. They need to be adopted by a thread + std::mutex _adoptionBlocksMutex; + // fixed-sized vector indexed by runnerId. Cheaper than a map. + std::vector> _adoptionBlocks; + MsgPortOutForChildren _toChildMessagePort; MsgPortInFromChildren _fromChildMessagePort; std::vector _pendingMessagesToChildren; @@ -368,6 +373,8 @@ class SchedulerBase : public Block { // Cleaning zombies has low priority, so uses process_stream_to_message_ratio (a different ratio could be introduced) cleanupZombieBlocks(localBlockList); + adoptBlocks(runnerID, localBlockList); + std::ranges::for_each(localBlockList, [](auto& block) { block->processScheduledMessages(); }); activeState = this->state(); msgToCount++; @@ -462,6 +469,36 @@ class SchedulerBase : public Block { auto& newBlock = _graph.emplaceBlock(type, properties); + if (lifecycle::isActive(this->state())) { + // Block is being added while scheduler is running. Will be adopted by a thread. + const auto nBatches = _adoptionBlocks.size(); + if (nBatches > 0) { + std::lock_guard guard(_adoptionBlocksMutex); + // pseudo-randomize which thread gets it + auto blockAddress = reinterpret_cast(&newBlock); + auto runnerIndex = (blockAddress / sizeof(void*)) % nBatches; + _adoptionBlocks[runnerIndex].push_back(&newBlock); + + switch (newBlock.state()) { + case lifecycle::State::STOPPED: + case lifecycle::State::IDLE: // + this->emitErrorMessageIfAny("adoptBlocks -> INITIALIZED", newBlock.changeStateTo(lifecycle::State::INITIALISED)); + this->emitErrorMessageIfAny("adoptBlocks -> INITIALIZED", newBlock.changeStateTo(lifecycle::State::RUNNING)); + break; + case lifecycle::State::INITIALISED: // + this->emitErrorMessageIfAny("adoptBlocks -> INITIALIZED", newBlock.changeStateTo(lifecycle::State::RUNNING)); + break; + case lifecycle::State::RUNNING: + case lifecycle::State::REQUESTED_PAUSE: + case lifecycle::State::PAUSED: + case lifecycle::State::REQUESTED_STOP: + case lifecycle::State::ERROR: // + this->emitErrorMessage("propertyCallbackEmplaceBlock", std::format("Unexpected block state during emplacement: {}", magic_enum::enum_name(newBlock.state()))); + break; + } + } + } + this->emitMessage(scheduler::property::kBlockEmplaced, Graph::serializeBlock(std::addressof(newBlock))); // Message is sent as a reaction to emplaceBlock, no need for a separate one @@ -598,11 +635,35 @@ class SchedulerBase : public Block { } } + void adoptBlocks(std::size_t runnerID, std::vector& localBlockList) { + std::lock_guard guard(_adoptionBlocksMutex); + + assert(_adoptionBlocks.size() > runnerID); + auto& newBlocks = _adoptionBlocks[runnerID]; + + localBlockList.reserve(localBlockList.size() + newBlocks.size()); + localBlockList.insert(localBlockList.end(), newBlocks.begin(), newBlocks.end()); + newBlocks.clear(); + } + void makeZombie(std::unique_ptr block) { if (block->state() == lifecycle::State::PAUSED || block->state() == lifecycle::State::RUNNING) { this->emitErrorMessageIfAny("makeZombie", block->changeStateTo(lifecycle::State::REQUESTED_STOP)); } + { + // Handle edge case: If we receive two consecutive "Add Block X" "Remove Block X" messages + // it would be zombie before being adopted, so we need to remove it from adoption list + std::lock_guard guard(_adoptionBlocksMutex); + for (auto& adoptionList : _adoptionBlocks) { + auto it = std::find(adoptionList.begin(), adoptionList.end(), block.get()); + if (it != adoptionList.end()) { + adoptionList.erase(it); + break; + } + } + } + std::lock_guard guard(_zombieBlocksMutex); _zombieBlocks.push_back(std::move(block)); } @@ -686,6 +747,8 @@ class Simple : public SchedulerBase, execution, TPr allBlocks.reserve(blockCount); this->forAllUnmanagedBlocks([&allBlocks](auto&& block) { allBlocks.push_back(block.get()); }); + this->_adoptionBlocks.clear(); + this->_adoptionBlocks.resize(n_batches); this->_jobLists->reserve(n_batches); for (std::size_t i = 0; i < n_batches; i++) { // create job-set for thread @@ -763,6 +826,8 @@ detecting cycles and blocks which can be reached from several source blocks.)""> case ExecutionPolicy::multiThreaded: n_batches = std::min(static_cast(this->_pool->maxThreads()), _blocklist.size()); break; } + this->_adoptionBlocks.clear(); + this->_adoptionBlocks.resize(n_batches); std::lock_guard lock(base_t::_jobListsMutex); this->_jobLists->reserve(n_batches); for (std::size_t i = 0; i < n_batches; i++) { diff --git a/core/test/qa_Scheduler.cpp b/core/test/qa_Scheduler.cpp index 5c085fa13..cdaf3835f 100644 --- a/core/test/qa_Scheduler.cpp +++ b/core/test/qa_Scheduler.cpp @@ -1,5 +1,7 @@ +#include "message_utils.hpp" #include +#include #include #include #include @@ -731,6 +733,65 @@ const boost::ut::suite<"SchedulerTests"> SchedulerTests = [] { expect(schedulerResult.has_value()) << errorMsg; }; + "add block while scheduler is running"_test = [] { + auto threadPool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); + using namespace gr; + using namespace gr::testing; + using TScheduler = scheduler::Simple<>; + + Graph flow; + auto& source = flow.emplaceBlock>(); + auto& sink = flow.emplaceBlock>(); + expect(eq(gr::ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(sink))); + + auto scheduler = TScheduler{std::move(flow), threadPool}; + + MsgPortOut toScheduler; + MsgPortIn fromScheduler; + expect(eq(ConnectionResult::SUCCESS, toScheduler.connect(scheduler.msgIn))); + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromScheduler))); + + std::expected schedulerResult; + auto schedulerThread = std::thread([&scheduler, &schedulerResult] { schedulerResult = scheduler.runAndWait(); }); + + expect(awaitCondition(2s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + + expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler is running"; + + auto initialBlockCount = scheduler.graph().blocks().size(); + std::println("Initial block count: {}", initialBlockCount); + + sendMessage(toScheduler, scheduler.unique_name, scheduler::property::kEmplaceBlock, property_map{{"type", std::string("good::cout_sink")}, {"properties", property_map{{"disconnect_on_done", false}}}}); + gr::testing::waitForReply(fromScheduler); + + auto messages = fromScheduler.streamReader().get(); + expect(gt(messages.size(), 0UZ)) << "received block emplaced message"; + auto message = messages[0]; + expect(eq(message.endpoint, scheduler::property::kBlockEmplaced)) << "correct message endpoint"; + expect(message.data.has_value()) << "message has data"; + auto consumed = messages.consume(1UZ); + expect(consumed) << "failed to consume message"; + + expect(awaitCondition(2s, [&scheduler, initialBlockCount] { return scheduler.graph().blocks().size() > initialBlockCount; })) << "waiting for block to be added to graph"; + + auto finalBlockCount = scheduler.graph().blocks().size(); + std::println("Final block count: {}", finalBlockCount); + expect(eq(finalBlockCount, initialBlockCount + 1)) << "block was added"; + + expect(awaitCondition(2s, [&scheduler] { + for (const auto& block : scheduler.graph().blocks()) { + if (block->name() == "good::cout_sink" && block->state() == lifecycle::State::RUNNING) { + return true; + } + } + return false; + })) << "waiting for new block to reach running state"; + + scheduler.requestStop(); + schedulerThread.join(); + expect(schedulerResult.has_value()) << "scheduler executed successfully"; + }; + std::println("N.B. test-suite finished"); }; From b32452342e740633cc858df3f8151b1975fb5073 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Tue, 27 May 2025 15:24:25 +0100 Subject: [PATCH 6/8] Remove invalid STOPPED->IDLE transition Lifecycle doesn't support it, so besides being a no-op it also spammed the logs with errors. The only permitted transition is the user resuming the scheduler, which should put it in initialized state Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Scheduler.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index ae6df5d78..462aa8189 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -433,7 +433,6 @@ class SchedulerBase : public Block { }); this->emitErrorMessageIfAny("stop() -> LifecycleState ->STOPPED", this->changeStateTo(lifecycle::State::STOPPED)); - this->emitErrorMessageIfAny("stop() -> LifecycleState ->IDLE", this->changeStateTo(lifecycle::State::IDLE)); } void pause() { From 5d06c75406b5be40e3ad7f140a5854c5a24333a9 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Thu, 29 May 2025 15:00:08 +0100 Subject: [PATCH 7/8] Implement handling "set graph YAML" message We move all the existing blocks to the zombie list. They'll get deleted when stopped. The scheduler needs to be restarted, so its init() method runs again to construct the job list and threads. The message is being processed by a worker thread, therefore we can't restart the scheduler here as that would crash. Scheduler is restarted automatically by OD when it receives the message reply. Also, the scheduler didn't support being restarted at all, the reset() method implementation was missing. Signed-off-by: Sergio Martins --- core/include/gnuradio-4.0/Graph.hpp | 5 ++ core/include/gnuradio-4.0/Scheduler.hpp | 88 +++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 5 deletions(-) diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index 6cb08ce47..4eced7d43 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -346,6 +346,11 @@ class Graph : public gr::Block { [[nodiscard]] std::span> blocks() noexcept { return {_blocks}; } [[nodiscard]] std::span edges() noexcept { return {_edges}; } + void clear() { + _blocks.clear(); + _edges.clear(); + } + /** * @return atomic sequence counter that indicates if any block could process some data or messages */ diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index 462aa8189..994c19e41 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -667,6 +667,36 @@ class SchedulerBase : public Block { _zombieBlocks.push_back(std::move(block)); } + // Moves all blocks into the zombie list + // Useful for bulk operations such as "set grc yaml" message + void makeAllZombies() { + std::lock_guard guard(_zombieBlocksMutex); + + for (auto& block : this->_graph.blocks()) { + switch (block->state()) { + case lifecycle::State::RUNNING: + case lifecycle::State::REQUESTED_PAUSE: + case lifecycle::State::PAUSED: // + this->emitErrorMessageIfAny("makeAllZombies", block->changeStateTo(lifecycle::State::REQUESTED_STOP)); + break; + + case lifecycle::State::INITIALISED: // + this->emitErrorMessageIfAny("makeAllZombies", block->changeStateTo(lifecycle::State::STOPPED)); + break; + case lifecycle::State::IDLE: + case lifecycle::State::STOPPED: + case lifecycle::State::ERROR: + case lifecycle::State::REQUESTED_STOP: + // Can go into the zombie list and deleted + break; + } + + _zombieBlocks.push_back(std::move(block)); + } + + this->_graph.clear(); + } + std::optional propertyCallbackGraphGRC([[maybe_unused]] std::string_view propertyName, Message message) { assert(propertyName == scheduler::property::kGraphGRC); @@ -674,11 +704,47 @@ class SchedulerBase : public Block { if (message.cmd == message::Command::Get) { message.data = property_map{{"value", gr::saveGrc(pluginLoader, _graph)}}; } else if (message.cmd == message::Command::Set) { - // const auto& data = message.data.value(); - // auto yamlContent = std::get(data.at("value"s)); - // Graph = gr::loadGrc(pluginLoader, yamlContent); - // We need to stop the scheduler first - throw gr::exception(std::format("Not implemented {}", message.cmd)); + const auto& data = message.data.value(); + auto yamlContent = std::get(data.at("value"s)); + + try { + Graph newGraph = gr::loadGrc(pluginLoader, yamlContent); + + makeAllZombies(); + + const auto originalState = this->state(); + + switch (originalState) { + case lifecycle::State::RUNNING: + case lifecycle::State::REQUESTED_PAUSE: + case lifecycle::State::PAUSED: // + this->emitErrorMessageIfAny("propertyCallbackGraphGRC -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::REQUESTED_STOP)); + this->emitErrorMessageIfAny("propertyCallbackGraphGRC -> STOPPED", this->changeStateTo(lifecycle::State::STOPPED)); + break; + case lifecycle::State::REQUESTED_STOP: + case lifecycle::State::INITIALISED: // + this->emitErrorMessageIfAny("propertyCallbackGraphGRC -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::STOPPED)); + break; + case lifecycle::State::IDLE: + assert(false); // doesn't happen + break; + case lifecycle::State::STOPPED: + case lifecycle::State::ERROR: break; + } + + _graph = std::move(newGraph); + + // Now ideally we'd just restart the Scheduler, but we can't since we're processing a message inside a working thread. + // When the scheduler starts running it asserts that _nRunningJobs is 0, so we can't start it now, we're in the job. + // We need to let poolWorker() unwind, decrement _nRunningJobs and then move scheduler to its original value. + // Alternatively, we could forbid kGraphGRC unless Scheduler was in STOPPED state. That would simplify logic, but + // put more burden on the client. + + message.data = property_map{{"originalSchedulerState", int(originalState)}}; + } catch (const std::exception& e) { + message.data = std::unexpected(Error{std::format("Error parsing YAML: {}", e.what())}); + } + } else { throw gr::exception(std::format("Unexpected command type {}", message.cmd)); } @@ -748,6 +814,7 @@ class Simple : public SchedulerBase, execution, TPr this->_adoptionBlocks.clear(); this->_adoptionBlocks.resize(n_batches); + this->_jobLists->clear(); this->_jobLists->reserve(n_batches); for (std::size_t i = 0; i < n_batches; i++) { // create job-set for thread @@ -758,6 +825,11 @@ class Simple : public SchedulerBase, execution, TPr } } } + + void reset() { + base_t::reset(); + init(); + } }; template @@ -828,6 +900,7 @@ detecting cycles and blocks which can be reached from several source blocks.)""> this->_adoptionBlocks.clear(); this->_adoptionBlocks.resize(n_batches); std::lock_guard lock(base_t::_jobListsMutex); + this->_jobLists->clear(); this->_jobLists->reserve(n_batches); for (std::size_t i = 0; i < n_batches; i++) { // create job-set for thread @@ -838,6 +911,11 @@ detecting cycles and blocks which can be reached from several source blocks.)""> } } } + + void reset() { + base_t::reset(); + init(); + } }; } // namespace gr::scheduler From 24d2c8cc2098078a3171313079bdbd8e923214d9 Mon Sep 17 00:00:00 2001 From: Sergio Martins Date: Fri, 30 May 2025 12:39:29 +0100 Subject: [PATCH 8/8] Move "add block to running scheduler test" to qa_SchedulerMessages.cpp As that's where all message based scheduler tests are Signed-off-by: Sergio Martins --- core/test/qa_Scheduler.cpp | 59 -------------------------- core/test/qa_SchedulerMessages.cpp | 66 ++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 59 deletions(-) diff --git a/core/test/qa_Scheduler.cpp b/core/test/qa_Scheduler.cpp index cdaf3835f..c106e0cfb 100644 --- a/core/test/qa_Scheduler.cpp +++ b/core/test/qa_Scheduler.cpp @@ -733,65 +733,6 @@ const boost::ut::suite<"SchedulerTests"> SchedulerTests = [] { expect(schedulerResult.has_value()) << errorMsg; }; - "add block while scheduler is running"_test = [] { - auto threadPool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); - using namespace gr; - using namespace gr::testing; - using TScheduler = scheduler::Simple<>; - - Graph flow; - auto& source = flow.emplaceBlock>(); - auto& sink = flow.emplaceBlock>(); - expect(eq(gr::ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(sink))); - - auto scheduler = TScheduler{std::move(flow), threadPool}; - - MsgPortOut toScheduler; - MsgPortIn fromScheduler; - expect(eq(ConnectionResult::SUCCESS, toScheduler.connect(scheduler.msgIn))); - expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromScheduler))); - - std::expected schedulerResult; - auto schedulerThread = std::thread([&scheduler, &schedulerResult] { schedulerResult = scheduler.runAndWait(); }); - - expect(awaitCondition(2s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; - - expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler is running"; - - auto initialBlockCount = scheduler.graph().blocks().size(); - std::println("Initial block count: {}", initialBlockCount); - - sendMessage(toScheduler, scheduler.unique_name, scheduler::property::kEmplaceBlock, property_map{{"type", std::string("good::cout_sink")}, {"properties", property_map{{"disconnect_on_done", false}}}}); - gr::testing::waitForReply(fromScheduler); - - auto messages = fromScheduler.streamReader().get(); - expect(gt(messages.size(), 0UZ)) << "received block emplaced message"; - auto message = messages[0]; - expect(eq(message.endpoint, scheduler::property::kBlockEmplaced)) << "correct message endpoint"; - expect(message.data.has_value()) << "message has data"; - auto consumed = messages.consume(1UZ); - expect(consumed) << "failed to consume message"; - - expect(awaitCondition(2s, [&scheduler, initialBlockCount] { return scheduler.graph().blocks().size() > initialBlockCount; })) << "waiting for block to be added to graph"; - - auto finalBlockCount = scheduler.graph().blocks().size(); - std::println("Final block count: {}", finalBlockCount); - expect(eq(finalBlockCount, initialBlockCount + 1)) << "block was added"; - - expect(awaitCondition(2s, [&scheduler] { - for (const auto& block : scheduler.graph().blocks()) { - if (block->name() == "good::cout_sink" && block->state() == lifecycle::State::RUNNING) { - return true; - } - } - return false; - })) << "waiting for new block to reach running state"; - - scheduler.requestStop(); - schedulerThread.join(); - expect(schedulerResult.has_value()) << "scheduler executed successfully"; - }; - std::println("N.B. test-suite finished"); }; diff --git a/core/test/qa_SchedulerMessages.cpp b/core/test/qa_SchedulerMessages.cpp index 914f06597..0298aa5a8 100644 --- a/core/test/qa_SchedulerMessages.cpp +++ b/core/test/qa_SchedulerMessages.cpp @@ -8,6 +8,7 @@ #include "TestBlockRegistryContext.hpp" +#include "magic_enum.hpp" #include "message_utils.hpp" using namespace std::chrono_literals; @@ -122,6 +123,71 @@ const boost::ut::suite TopologyGraphTests = [] { }; }; + "add block while scheduler is running"_test = [] { + auto threadPool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); + using namespace gr; + using namespace gr::testing; + using TScheduler = scheduler::Simple<>; + + Graph flow(context->loader); + auto& source = flow.emplaceBlock>(); + auto& sink = flow.emplaceBlock>(); + expect(eq(gr::ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(sink))); + + auto scheduler = TScheduler{std::move(flow), threadPool}; + + MsgPortOut toScheduler; + MsgPortIn fromScheduler; + expect(eq(ConnectionResult::SUCCESS, toScheduler.connect(scheduler.msgIn))); + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromScheduler))); + + std::expected schedulerResult; + auto schedulerThread = std::thread([&scheduler, &schedulerResult] { schedulerResult = scheduler.runAndWait(); }); + + expect(awaitCondition(2s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + + expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler is running"; + + auto initialBlockCount = scheduler.graph().blocks().size(); + std::println("Initial block count: {}", initialBlockCount); + + for (const auto& block : gr::globalBlockRegistry().keys()) { + std::println("Block {}", block); + } + + sendMessage(toScheduler, scheduler.unique_name, scheduler::property::kEmplaceBlock, property_map{{"type", std::string("builtin_counter")}, {"properties", property_map{{"disconnect_on_done", false}}}}); + gr::testing::waitForReply(fromScheduler); + + auto messages = fromScheduler.streamReader().get(); + expect(gt(messages.size(), 0UZ)) << "received block emplaced message"; + auto message = messages[0]; + + std::println("Got a message {}", message); + expect(eq(message.endpoint, scheduler::property::kBlockEmplaced)) << "correct message endpoint"; + expect(message.data.has_value()) << "message has data"; + auto consumed = messages.consume(1UZ); + expect(consumed) << "failed to consume message"; + + expect(awaitCondition(2s, [&scheduler, initialBlockCount] { return scheduler.graph().blocks().size() > initialBlockCount; })) << "waiting for block to be added to graph"; + + auto finalBlockCount = scheduler.graph().blocks().size(); + std::println("Final block count: {}", finalBlockCount); + expect(eq(finalBlockCount, initialBlockCount + 1)) << "block was added"; + + expect(awaitCondition(2s, [&scheduler] { + for (const auto& block : scheduler.graph().blocks()) { + if (block->name() == "builtin_counter" && block->state() == lifecycle::State::RUNNING) { + return true; + } + } + return false; + })) << "waiting for new block to reach running state"; + + scheduler.requestStop(); + schedulerThread.join(); + expect(schedulerResult.has_value()) << "scheduler executed successfully"; + }; + "Block removal tests"_test = [] { TestScheduler scheduler(gr::Graph(context->loader));