Skip to content

Commit 377ceac

Browse files
committed
Move graph topology changing messages to Scheduler
For fair-acc/opendigitizer#246. OD will need to be adapted to send messages to scheduler instead of graph. Adapted tests. Introduced qa_SchedulerMessages.cpp for topology changing messages. Signed-off-by: Sergio Martins <[email protected]>
1 parent a3e29f0 commit 377ceac

File tree

7 files changed

+525
-425
lines changed

7 files changed

+525
-425
lines changed

core/include/gnuradio-4.0/Graph.hpp

Lines changed: 15 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,8 @@
3333
namespace gr {
3434

3535
namespace graph::property {
36-
inline static const char* kEmplaceBlock = "EmplaceBlock";
37-
inline static const char* kRemoveBlock = "RemoveBlock";
38-
inline static const char* kReplaceBlock = "ReplaceBlock";
39-
inline static const char* kInspectBlock = "InspectBlock";
40-
41-
inline static const char* kBlockEmplaced = "BlockEmplaced";
42-
inline static const char* kBlockRemoved = "BlockRemoved";
43-
inline static const char* kBlockReplaced = "BlockReplaced";
36+
inline static const char* kInspectBlock = "InspectBlock";
4437
inline static const char* kBlockInspected = "BlockInspected";
45-
46-
inline static const char* kEmplaceEdge = "EmplaceEdge";
47-
inline static const char* kRemoveEdge = "RemoveEdge";
48-
49-
inline static const char* kEdgeEmplaced = "EdgeEmplaced";
50-
inline static const char* kEdgeRemoved = "EdgeRemoved";
51-
5238
inline static const char* kGraphInspect = "GraphInspect";
5339
inline static const char* kGraphInspected = "GraphInspected";
5440

@@ -335,12 +321,8 @@ class Graph : public gr::Block<Graph> {
335321

336322
Graph(property_map settings = {}) : gr::Block<Graph>(std::move(settings)) {
337323
_blocks.reserve(100); // TODO: remove
338-
propertyCallbacks[graph::property::kEmplaceBlock] = std::mem_fn(&Graph::propertyCallbackEmplaceBlock);
339-
propertyCallbacks[graph::property::kRemoveBlock] = std::mem_fn(&Graph::propertyCallbackRemoveBlock);
324+
340325
propertyCallbacks[graph::property::kInspectBlock] = std::mem_fn(&Graph::propertyCallbackInspectBlock);
341-
propertyCallbacks[graph::property::kReplaceBlock] = std::mem_fn(&Graph::propertyCallbackReplaceBlock);
342-
propertyCallbacks[graph::property::kEmplaceEdge] = std::mem_fn(&Graph::propertyCallbackEmplaceEdge);
343-
propertyCallbacks[graph::property::kRemoveEdge] = std::mem_fn(&Graph::propertyCallbackRemoveEdge);
344326
propertyCallbacks[graph::property::kGraphInspect] = std::mem_fn(&Graph::propertyCallbackGraphInspect);
345327
propertyCallbacks[graph::property::kRegistryBlockTypes] = std::mem_fn(&Graph::propertyCallbackRegistryBlockTypes);
346328
}
@@ -375,14 +357,10 @@ class Graph : public gr::Block<Graph> {
375357
*/
376358
[[nodiscard]] const Sequence& progress() noexcept { return *_progress.get(); }
377359

378-
BlockModel& addBlock(std::unique_ptr<BlockModel> block, bool doEmitMessage = true) {
360+
BlockModel& addBlock(std::unique_ptr<BlockModel> block) {
379361
auto& newBlock = _blocks.emplace_back(std::move(block));
380362
newBlock->init(_progress, _ioThreadPool);
381363
// TODO: Should we connectChildMessagePorts for these blocks as well?
382-
setTopologyChanged();
383-
if (doEmitMessage) {
384-
this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(newBlock.get()));
385-
}
386364
return *newBlock.get();
387365
}
388366

@@ -393,18 +371,12 @@ class Graph : public gr::Block<Graph> {
393371
auto& newBlock = _blocks.emplace_back(std::make_unique<BlockWrapper<TBlock>>(std::move(initialSettings)));
394372
auto* rawBlockRef = static_cast<TBlock*>(newBlock->raw());
395373
rawBlockRef->init(_progress, _ioThreadPool);
396-
setTopologyChanged();
397-
this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(newBlock.get()));
398374
return *rawBlockRef;
399375
}
400376

401377
[[maybe_unused]] auto& emplaceBlock(std::string_view type, property_map initialSettings) {
402378
if (auto block_load = _pluginLoader->instantiate(type, std::move(initialSettings)); block_load) {
403-
setTopologyChanged();
404-
auto& newBlock = addBlock(std::move(block_load), false); // false == do not emit message
405-
406-
this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(std::addressof(newBlock)));
407-
379+
auto& newBlock = addBlock(std::move(block_load));
408380
return newBlock;
409381
}
410382
throw gr::exception(std::format("Can not create block {}", type));
@@ -503,25 +475,6 @@ class Graph : public gr::Block<Graph> {
503475
return result;
504476
}
505477

506-
std::optional<Message> propertyCallbackEmplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) {
507-
assert(propertyName == graph::property::kEmplaceBlock);
508-
using namespace std::string_literals;
509-
const auto& data = message.data.value();
510-
const std::string& type = std::get<std::string>(data.at("type"s));
511-
const property_map& properties = [&] {
512-
if (auto it = data.find("properties"s); it != data.end()) {
513-
return std::get<property_map>(it->second);
514-
} else {
515-
return property_map{};
516-
}
517-
}();
518-
519-
emplaceBlock(type, properties);
520-
521-
// Message is sent as a reaction to emplaceBlock, no need for a separate one
522-
return {};
523-
}
524-
525478
std::optional<Message> propertyCallbackInspectBlock([[maybe_unused]] std::string_view propertyName, Message message) {
526479
assert(propertyName == graph::property::kInspectBlock);
527480
using namespace std::string_literals;
@@ -540,12 +493,8 @@ class Graph : public gr::Block<Graph> {
540493
return {reply};
541494
}
542495

543-
std::optional<Message> propertyCallbackRemoveBlock([[maybe_unused]] std::string_view propertyName, Message message) {
544-
assert(propertyName == graph::property::kRemoveBlock);
545-
using namespace std::string_literals;
546-
const auto& data = message.data.value();
547-
const std::string& uniqueName = std::get<std::string>(data.at("uniqueName"s));
548-
auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; });
496+
void removeBlockByName(std::string_view uniqueName) {
497+
auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; });
549498

550499
if (it == _blocks.end()) {
551500
throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name));
@@ -554,26 +503,11 @@ class Graph : public gr::Block<Graph> {
554503
std::erase_if(_edges, [&it](const Edge& edge) { //
555504
return std::addressof(edge.sourceBlock()) == it->get() || std::addressof(edge.destinationBlock()) == it->get();
556505
});
557-
_blocks.erase(it);
558-
message.endpoint = graph::property::kBlockRemoved;
559506

560-
return {message};
507+
_blocks.erase(it);
561508
}
562509

563-
std::optional<Message> propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) {
564-
assert(propertyName == graph::property::kReplaceBlock);
565-
using namespace std::string_literals;
566-
const auto& data = message.data.value();
567-
const std::string& uniqueName = std::get<std::string>(data.at("uniqueName"s));
568-
const std::string& type = std::get<std::string>(data.at("type"s));
569-
const property_map& properties = [&] {
570-
if (auto it = data.find("properties"s); it != data.end()) {
571-
return std::get<property_map>(it->second);
572-
} else {
573-
return property_map{};
574-
}
575-
}();
576-
510+
gr::BlockModel* replaceBlock(const std::string& uniqueName, const std::string& type, const property_map& properties) {
577511
auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; });
578512
if (it == _blocks.end()) {
579513
throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name));
@@ -585,7 +519,7 @@ class Graph : public gr::Block<Graph> {
585519
throw gr::exception(std::format("Can not create block {}", type));
586520
}
587521

588-
addBlock(std::move(newBlock), false); // false == do not emit message
522+
addBlock(std::move(newBlock));
589523

590524
BlockModel* oldBlock = it->get();
591525
for (auto& edge : _edges) {
@@ -599,27 +533,11 @@ class Graph : public gr::Block<Graph> {
599533
}
600534
_blocks.erase(it);
601535

602-
std::optional<Message> result = gr::Message{};
603-
result->endpoint = graph::property::kBlockReplaced;
604-
result->data = serializeBlock(newBlockRaw);
605-
606-
(*result->data)["replacedBlockUniqueName"s] = uniqueName;
607-
608-
return result;
536+
return newBlockRaw;
609537
}
610538

611-
std::optional<Message> propertyCallbackEmplaceEdge([[maybe_unused]] std::string_view propertyName, Message message) {
612-
assert(propertyName == graph::property::kEmplaceEdge);
613-
using namespace std::string_literals;
614-
const auto& data = message.data.value();
615-
const std::string& sourceBlock = std::get<std::string>(data.at("sourceBlock"s));
616-
const std::string& sourcePort = std::get<std::string>(data.at("sourcePort"s));
617-
const std::string& destinationBlock = std::get<std::string>(data.at("destinationBlock"s));
618-
const std::string& destinationPort = std::get<std::string>(data.at("destinationPort"s));
619-
[[maybe_unused]] const std::size_t minBufferSize = std::get<gr::Size_t>(data.at("minBufferSize"s));
620-
[[maybe_unused]] const std::int32_t weight = std::get<std::int32_t>(data.at("weight"s));
621-
const std::string edgeName = std::get<std::string>(data.at("edgeName"s));
622-
539+
void emplaceEdge(std::string_view sourceBlock, std::string sourcePort, std::string_view destinationBlock, //
540+
std::string destinationPort, [[maybe_unused]] const std::size_t minBufferSize, [[maybe_unused]] const std::int32_t weight, std::string_view edgeName) {
623541
auto sourceBlockIt = std::ranges::find_if(_blocks, [&sourceBlock](const auto& block) { return block->uniqueName() == sourceBlock; });
624542
if (sourceBlockIt == _blocks.end()) {
625543
throw gr::exception(std::format("Block {} was not found in {}", sourceBlock, this->unique_name));
@@ -645,31 +563,20 @@ class Graph : public gr::Block<Graph> {
645563

646564
const bool isArithmeticLike = sourcePortRef.portInfo().isValueTypeArithmeticLike;
647565
const std::size_t sanitizedMinBufferSize = minBufferSize == undefined_size ? graph::defaultMinBufferSize(isArithmeticLike) : minBufferSize;
648-
_edges.emplace_back(sourceBlockIt->get(), sourcePort, destinationBlockIt->get(), destinationPort, sanitizedMinBufferSize, weight, edgeName);
649-
650-
message.endpoint = graph::property::kEdgeEmplaced;
651-
return message;
566+
_edges.emplace_back(sourceBlockIt->get(), sourcePort, destinationBlockIt->get(), destinationPort, sanitizedMinBufferSize, weight, std::string(edgeName));
652567
}
653568

654-
std::optional<Message> propertyCallbackRemoveEdge([[maybe_unused]] std::string_view propertyName, Message message) {
655-
assert(propertyName == graph::property::kRemoveEdge);
656-
using namespace std::string_literals;
657-
const auto& data = message.data.value();
658-
const std::string& sourceBlock = std::get<std::string>(data.at("sourceBlock"s));
659-
const std::string& sourcePort = std::get<std::string>(data.at("sourcePort"s));
660-
569+
void removeEdgeBySourcePort(std::string_view sourceBlock, std::string_view sourcePort) {
661570
auto sourceBlockIt = std::ranges::find_if(_blocks, [&sourceBlock](const auto& block) { return block->uniqueName() == sourceBlock; });
662571
if (sourceBlockIt == _blocks.end()) {
663572
throw gr::exception(std::format("Block {} was not found in {}", sourceBlock, this->unique_name));
664573
}
665574

666-
auto& sourcePortRef = (*sourceBlockIt)->dynamicOutputPort(sourcePort);
575+
auto& sourcePortRef = (*sourceBlockIt)->dynamicOutputPort(std::string(sourcePort));
667576

668577
if (sourcePortRef.disconnect() == ConnectionResult::FAILED) {
669578
throw gr::exception(std::format("Block {} sourcePortRef could not be disconnected {}", sourceBlock, this->unique_name));
670579
}
671-
message.endpoint = graph::property::kEdgeRemoved;
672-
return message;
673580
}
674581

675582
std::optional<Message> propertyCallbackGraphInspect([[maybe_unused]] std::string_view propertyName, Message message) {

core/include/gnuradio-4.0/Scheduler.hpp

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include <mutex>
77
#include <queue>
88
#include <set>
9-
#include <source_location>
9+
1010
#include <thread>
1111
#include <utility>
1212

@@ -22,6 +22,21 @@ namespace gr::scheduler {
2222
using gr::thread_pool::BasicThreadPool;
2323
using namespace gr::message;
2424

25+
namespace property {
26+
27+
inline static const char* kEmplaceBlock = "EmplaceBlock";
28+
inline static const char* kRemoveBlock = "RemoveBlock";
29+
inline static const char* kReplaceBlock = "ReplaceBlock";
30+
inline static const char* kEmplaceEdge = "EmplaceEdge";
31+
inline static const char* kRemoveEdge = "RemoveEdge";
32+
33+
inline static const char* kBlockEmplaced = "BlockEmplaced";
34+
inline static const char* kBlockRemoved = "BlockRemoved";
35+
inline static const char* kBlockReplaced = "BlockReplaced";
36+
inline static const char* kEdgeEmplaced = "EdgeEmplaced";
37+
inline static const char* kEdgeRemoved = "EdgeRemoved";
38+
} // namespace property
39+
2540
enum class ExecutionPolicy {
2641
singleThreaded, ///
2742
multiThreaded, ///
@@ -86,7 +101,13 @@ class SchedulerBase : public Block<Derived> {
86101
explicit SchedulerBase(gr::Graph&& graph, //
87102
std::shared_ptr<BasicThreadPool> thread_pool = std::make_shared<BasicThreadPool>("simple-scheduler-pool", thread_pool::CPU_BOUND), //
88103
const profiling::Options& profiling_options = {}) //
89-
: _graph(std::move(graph)), _profiler{profiling_options}, _profilerHandler{_profiler.forThisThread()}, _pool(std::move(thread_pool)) {}
104+
: _graph(std::move(graph)), _profiler{profiling_options}, _profilerHandler{_profiler.forThisThread()}, _pool(std::move(thread_pool)) {
105+
this->propertyCallbacks[scheduler::property::kEmplaceBlock] = std::mem_fn(&SchedulerBase::propertyCallbackEmplaceBlock);
106+
this->propertyCallbacks[scheduler::property::kRemoveBlock] = std::mem_fn(&SchedulerBase::propertyCallbackRemoveBlock);
107+
this->propertyCallbacks[scheduler::property::kRemoveEdge] = std::mem_fn(&SchedulerBase::propertyCallbackRemoveEdge);
108+
this->propertyCallbacks[scheduler::property::kEmplaceEdge] = std::mem_fn(&SchedulerBase::propertyCallbackEmplaceEdge);
109+
this->propertyCallbacks[scheduler::property::kReplaceBlock] = std::mem_fn(&SchedulerBase::propertyCallbackReplaceBlock);
110+
}
90111

91112
~SchedulerBase() {
92113
if (this->state() == lifecycle::RUNNING) {
@@ -416,6 +437,95 @@ class SchedulerBase : public Block<Derived> {
416437
}
417438
forAllUnmanagedBlocks([this](auto& block) { this->emitErrorMessageIfAny("resume() -> LifecycleState", block->changeStateTo(lifecycle::RUNNING)); });
418439
}
440+
441+
std::optional<Message> propertyCallbackEmplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) {
442+
assert(propertyName == scheduler::property::kEmplaceBlock);
443+
using namespace std::string_literals;
444+
const auto& data = message.data.value();
445+
const std::string& type = std::get<std::string>(data.at("type"s));
446+
const property_map& properties = [&] {
447+
if (auto it = data.find("properties"s); it != data.end()) {
448+
return std::get<property_map>(it->second);
449+
} else {
450+
return property_map{};
451+
}
452+
}();
453+
454+
auto& newBlock = _graph.emplaceBlock(type, properties);
455+
456+
this->emitMessage(scheduler::property::kBlockEmplaced, Graph::serializeBlock(std::addressof(newBlock)));
457+
458+
// Message is sent as a reaction to emplaceBlock, no need for a separate one
459+
return {};
460+
}
461+
462+
std::optional<Message> propertyCallbackRemoveBlock([[maybe_unused]] std::string_view propertyName, Message message) {
463+
assert(propertyName == scheduler::property::kRemoveBlock);
464+
using namespace std::string_literals;
465+
const auto& data = message.data.value();
466+
const std::string& uniqueName = std::get<std::string>(data.at("uniqueName"s));
467+
468+
_graph.removeBlockByName(uniqueName);
469+
470+
message.endpoint = scheduler::property::kBlockRemoved;
471+
return {message};
472+
}
473+
474+
std::optional<Message> propertyCallbackRemoveEdge([[maybe_unused]] std::string_view propertyName, Message message) {
475+
assert(propertyName == scheduler::property::kRemoveEdge);
476+
using namespace std::string_literals;
477+
const auto& data = message.data.value();
478+
const std::string& sourceBlock = std::get<std::string>(data.at("sourceBlock"s));
479+
const std::string& sourcePort = std::get<std::string>(data.at("sourcePort"s));
480+
481+
_graph.removeEdgeBySourcePort(sourceBlock, sourcePort);
482+
483+
message.endpoint = scheduler::property::kEdgeRemoved;
484+
return message;
485+
}
486+
487+
std::optional<Message> propertyCallbackEmplaceEdge([[maybe_unused]] std::string_view propertyName, Message message) {
488+
assert(propertyName == scheduler::property::kEmplaceEdge);
489+
using namespace std::string_literals;
490+
const auto& data = message.data.value();
491+
const std::string& sourceBlock = std::get<std::string>(data.at("sourceBlock"s));
492+
const std::string& sourcePort = std::get<std::string>(data.at("sourcePort"s));
493+
const std::string& destinationBlock = std::get<std::string>(data.at("destinationBlock"s));
494+
const std::string& destinationPort = std::get<std::string>(data.at("destinationPort"s));
495+
[[maybe_unused]] const std::size_t minBufferSize = std::get<gr::Size_t>(data.at("minBufferSize"s));
496+
[[maybe_unused]] const std::int32_t weight = std::get<std::int32_t>(data.at("weight"s));
497+
const std::string edgeName = std::get<std::string>(data.at("edgeName"s));
498+
499+
_graph.emplaceEdge(sourceBlock, sourcePort, destinationBlock, destinationPort, minBufferSize, weight, edgeName);
500+
501+
message.endpoint = scheduler::property::kEdgeEmplaced;
502+
return message;
503+
}
504+
505+
std::optional<Message> propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) {
506+
assert(propertyName == scheduler::property::kReplaceBlock);
507+
using namespace std::string_literals;
508+
const auto& data = message.data.value();
509+
const std::string& uniqueName = std::get<std::string>(data.at("uniqueName"s));
510+
const std::string& type = std::get<std::string>(data.at("type"s));
511+
const property_map& properties = [&] {
512+
if (auto it = data.find("properties"s); it != data.end()) {
513+
return std::get<property_map>(it->second);
514+
} else {
515+
return property_map{};
516+
}
517+
}();
518+
519+
auto newBlockRaw = _graph.replaceBlock(uniqueName, type, properties);
520+
521+
std::optional<Message> result = gr::Message{};
522+
result->endpoint = scheduler::property::kBlockReplaced;
523+
result->data = Graph::serializeBlock(newBlockRaw);
524+
525+
(*result->data)["replacedBlockUniqueName"s] = uniqueName;
526+
527+
return result;
528+
}
419529
};
420530

421531
template<ExecutionPolicy execution = ExecutionPolicy::singleThreaded, profiling::ProfilerLike TProfiler = profiling::null::Profiler>

core/test/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ if(GR_ENABLE_BLOCK_REGISTRY AND INTERNAL_ENABLE_BLOCK_PLUGINS)
8181
add_ut_test(qa_GraphMessages)
8282
target_link_libraries(qa_GraphMessages PUBLIC GrBasicBlocksShared GrTestingBlocksShared)
8383

84+
add_ut_test(qa_SchedulerMessages)
85+
target_link_libraries(qa_SchedulerMessages PUBLIC GrBasicBlocksShared GrTestingBlocksShared)
86+
8487
add_subdirectory(plugins)
8588
add_app_test(qa_plugins_test)
8689
target_link_libraries(qa_plugins_test PUBLIC GrBasicBlocksShared GrTestingBlocksShared)

0 commit comments

Comments
 (0)