Skip to content

Commit 09f0ffa

Browse files
authored
Move topology changing messages to Scheduler and introduce Zombie list (#570)
* Move graph topology changing messages to Scheduler For fair-acc/opendigitizer#246. OD will need to be adapted to send messages to scheduler instead of graph. Adapted tests. Introduced qa_SchedulerMessages.cpp for topology changing messages. * Introduce zombie list for stopping blocks This allows to remove blocks without having to stop the scheduler. Running blocks can't transition directly to stopped state, so we move them into a "zombie list". There's a maintenance period where we delete blocks that have stopped. Furthermore, deleted blocks are now removed from each thread's local block list. * Remove unused "topology changed" mechanism This was never used. It was set but never read and the job list was never updated. With the "zombie list" there's no need for it anymore. * Add scheduler message to retrieve the graph as YAML * Support adding blocks into a running scheduler During the maintenance period, when we cleanup zombies, we also check if a thread should adopt blocks that were added meanwhile * Remove invalid STOPPED->IDLE transition Lifecycle doesn't support it, so besides being a no-op it also spammed the logs with errors. The only permitted transition is the user resuming the scheduler, which should put it in initialized state * Implement handling "set graph YAML" message We move all the existing blocks to the zombie list. They'll get deleted when stopped. The scheduler needs to be restarted, so its init() method runs again to construct the job list and threads. The message is being processed by a worker thread, therefore we can't restart the scheduler here as that would crash. Scheduler is restarted automatically by OD when it receives the message reply. Also, the scheduler didn't support being restarted at all, the reset() method implementation was missing. * Move "add block to running scheduler test" to qa_SchedulerMessages.cpp As that's where all message-based scheduler tests are Signed-off-by: Sergio Martins <[email protected]>
1 parent cf5ce69 commit 09f0ffa

File tree

9 files changed

+912
-440
lines changed

9 files changed

+912
-440
lines changed

core/include/gnuradio-4.0/Block.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ class Block : public lifecycle::StateMachine<Derived> {
498498

499499
~Block() { // NOSONAR -- need to request the (potentially) running ioThread to stop
500500
if (lifecycle::isActive(this->state())) {
501+
// Only happens in artificial cases likes qa_Block test. In practice blocks stay in zombie list if active
501502
emitErrorMessageIfAny("~Block()", this->changeStateTo(lifecycle::State::REQUESTED_STOP));
502503
}
503504
if constexpr (blockingIO) {

core/include/gnuradio-4.0/Graph.hpp

Lines changed: 28 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,8 @@
3333
namespace gr {
3434

3535
namespace graph::property {
36-
inline static const char* kEmplaceBlock = "EmplaceBlock";
37-
inline static const char* kRemoveBlock = "RemoveBlock";
38-
inline static const char* kReplaceBlock = "ReplaceBlock";
39-
inline static const char* kInspectBlock = "InspectBlock";
40-
41-
inline static const char* kBlockEmplaced = "BlockEmplaced";
42-
inline static const char* kBlockRemoved = "BlockRemoved";
43-
inline static const char* kBlockReplaced = "BlockReplaced";
36+
inline static const char* kInspectBlock = "InspectBlock";
4437
inline static const char* kBlockInspected = "BlockInspected";
45-
46-
inline static const char* kEmplaceEdge = "EmplaceEdge";
47-
inline static const char* kRemoveEdge = "RemoveEdge";
48-
49-
inline static const char* kEdgeEmplaced = "EdgeEmplaced";
50-
inline static const char* kEdgeRemoved = "EdgeRemoved";
51-
5238
inline static const char* kGraphInspect = "GraphInspect";
5339
inline static const char* kGraphInspected = "GraphInspected";
5440

@@ -202,7 +188,6 @@ class Graph : public gr::Block<Graph> {
202188
private:
203189
std::shared_ptr<gr::Sequence> _progress = std::make_shared<gr::Sequence>();
204190
std::shared_ptr<gr::thread_pool::BasicThreadPool> _ioThreadPool = std::make_shared<gr::thread_pool::BasicThreadPool>("graph_thread_pool", gr::thread_pool::TaskType::IO_BOUND, 2UZ, std::numeric_limits<uint32_t>::max());
205-
std::atomic_bool _topologyChanged{false};
206191
std::vector<Edge> _edges;
207192
std::vector<std::unique_ptr<BlockModel>> _blocks;
208193

@@ -335,12 +320,8 @@ class Graph : public gr::Block<Graph> {
335320

336321
Graph(property_map settings = {}) : gr::Block<Graph>(std::move(settings)) {
337322
_blocks.reserve(100); // TODO: remove
338-
propertyCallbacks[graph::property::kEmplaceBlock] = std::mem_fn(&Graph::propertyCallbackEmplaceBlock);
339-
propertyCallbacks[graph::property::kRemoveBlock] = std::mem_fn(&Graph::propertyCallbackRemoveBlock);
323+
340324
propertyCallbacks[graph::property::kInspectBlock] = std::mem_fn(&Graph::propertyCallbackInspectBlock);
341-
propertyCallbacks[graph::property::kReplaceBlock] = std::mem_fn(&Graph::propertyCallbackReplaceBlock);
342-
propertyCallbacks[graph::property::kEmplaceEdge] = std::mem_fn(&Graph::propertyCallbackEmplaceEdge);
343-
propertyCallbacks[graph::property::kRemoveEdge] = std::mem_fn(&Graph::propertyCallbackRemoveEdge);
344325
propertyCallbacks[graph::property::kGraphInspect] = std::mem_fn(&Graph::propertyCallbackGraphInspect);
345326
propertyCallbacks[graph::property::kRegistryBlockTypes] = std::mem_fn(&Graph::propertyCallbackRegistryBlockTypes);
346327
}
@@ -356,33 +337,29 @@ class Graph : public gr::Block<Graph> {
356337
}
357338
_progress = std::move(other._progress);
358339
_ioThreadPool = std::move(other._ioThreadPool);
359-
_topologyChanged.store(other._topologyChanged.load(std::memory_order_acquire), std::memory_order_release);
360-
_edges = std::move(other._edges);
361-
_blocks = std::move(other._blocks);
340+
_edges = std::move(other._edges);
341+
_blocks = std::move(other._blocks);
362342

363343
return *this;
364344
}
365345

366-
void setTopologyChanged() noexcept { _topologyChanged.store(true, std::memory_order_release); }
367-
[[nodiscard]] bool hasTopologyChanged() const noexcept { return _topologyChanged; }
368-
void ackTopologyChange() noexcept { _topologyChanged.store(false, std::memory_order_release); }
369-
370346
[[nodiscard]] std::span<std::unique_ptr<BlockModel>> blocks() noexcept { return {_blocks}; }
371347
[[nodiscard]] std::span<Edge> edges() noexcept { return {_edges}; }
372348

349+
void clear() {
350+
_blocks.clear();
351+
_edges.clear();
352+
}
353+
373354
/**
374355
* @return atomic sequence counter that indicates if any block could process some data or messages
375356
*/
376357
[[nodiscard]] const Sequence& progress() noexcept { return *_progress.get(); }
377358

378-
BlockModel& addBlock(std::unique_ptr<BlockModel> block, bool doEmitMessage = true) {
359+
BlockModel& addBlock(std::unique_ptr<BlockModel> block) {
379360
auto& newBlock = _blocks.emplace_back(std::move(block));
380361
newBlock->init(_progress, _ioThreadPool);
381362
// TODO: Should we connectChildMessagePorts for these blocks as well?
382-
setTopologyChanged();
383-
if (doEmitMessage) {
384-
this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(newBlock.get()));
385-
}
386363
return *newBlock.get();
387364
}
388365

@@ -393,18 +370,12 @@ class Graph : public gr::Block<Graph> {
393370
auto& newBlock = _blocks.emplace_back(std::make_unique<BlockWrapper<TBlock>>(std::move(initialSettings)));
394371
auto* rawBlockRef = static_cast<TBlock*>(newBlock->raw());
395372
rawBlockRef->init(_progress, _ioThreadPool);
396-
setTopologyChanged();
397-
this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(newBlock.get()));
398373
return *rawBlockRef;
399374
}
400375

401376
[[maybe_unused]] auto& emplaceBlock(std::string_view type, property_map initialSettings) {
402377
if (auto block_load = _pluginLoader->instantiate(type, std::move(initialSettings)); block_load) {
403-
setTopologyChanged();
404-
auto& newBlock = addBlock(std::move(block_load), false); // false == do not emit message
405-
406-
this->emitMessage(graph::property::kBlockEmplaced, serializeBlock(std::addressof(newBlock)));
407-
378+
auto& newBlock = addBlock(std::move(block_load));
408379
return newBlock;
409380
}
410381
throw gr::exception(std::format("Can not create block {}", type));
@@ -503,25 +474,6 @@ class Graph : public gr::Block<Graph> {
503474
return result;
504475
}
505476

506-
std::optional<Message> propertyCallbackEmplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) {
507-
assert(propertyName == graph::property::kEmplaceBlock);
508-
using namespace std::string_literals;
509-
const auto& data = message.data.value();
510-
const std::string& type = std::get<std::string>(data.at("type"s));
511-
const property_map& properties = [&] {
512-
if (auto it = data.find("properties"s); it != data.end()) {
513-
return std::get<property_map>(it->second);
514-
} else {
515-
return property_map{};
516-
}
517-
}();
518-
519-
emplaceBlock(type, properties);
520-
521-
// Message is sent as a reaction to emplaceBlock, no need for a separate one
522-
return {};
523-
}
524-
525477
std::optional<Message> propertyCallbackInspectBlock([[maybe_unused]] std::string_view propertyName, Message message) {
526478
assert(propertyName == graph::property::kInspectBlock);
527479
using namespace std::string_literals;
@@ -540,12 +492,8 @@ class Graph : public gr::Block<Graph> {
540492
return {reply};
541493
}
542494

543-
std::optional<Message> propertyCallbackRemoveBlock([[maybe_unused]] std::string_view propertyName, Message message) {
544-
assert(propertyName == graph::property::kRemoveBlock);
545-
using namespace std::string_literals;
546-
const auto& data = message.data.value();
547-
const std::string& uniqueName = std::get<std::string>(data.at("uniqueName"s));
548-
auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; });
495+
std::unique_ptr<BlockModel> removeBlockByName(std::string_view uniqueName) {
496+
auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; });
549497

550498
if (it == _blocks.end()) {
551499
throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name));
@@ -554,26 +502,14 @@ class Graph : public gr::Block<Graph> {
554502
std::erase_if(_edges, [&it](const Edge& edge) { //
555503
return std::addressof(edge.sourceBlock()) == it->get() || std::addressof(edge.destinationBlock()) == it->get();
556504
});
505+
506+
std::unique_ptr<BlockModel> removedBlock = std::move(*it);
557507
_blocks.erase(it);
558-
message.endpoint = graph::property::kBlockRemoved;
559508

560-
return {message};
509+
return removedBlock;
561510
}
562511

563-
std::optional<Message> propertyCallbackReplaceBlock([[maybe_unused]] std::string_view propertyName, Message message) {
564-
assert(propertyName == graph::property::kReplaceBlock);
565-
using namespace std::string_literals;
566-
const auto& data = message.data.value();
567-
const std::string& uniqueName = std::get<std::string>(data.at("uniqueName"s));
568-
const std::string& type = std::get<std::string>(data.at("type"s));
569-
const property_map& properties = [&] {
570-
if (auto it = data.find("properties"s); it != data.end()) {
571-
return std::get<property_map>(it->second);
572-
} else {
573-
return property_map{};
574-
}
575-
}();
576-
512+
std::pair<std::unique_ptr<BlockModel>, BlockModel*> replaceBlock(const std::string& uniqueName, const std::string& type, const property_map& properties) {
577513
auto it = std::ranges::find_if(_blocks, [&uniqueName](const auto& block) { return block->uniqueName() == uniqueName; });
578514
if (it == _blocks.end()) {
579515
throw gr::exception(std::format("Block {} was not found in {}", uniqueName, this->unique_name));
@@ -585,41 +521,26 @@ class Graph : public gr::Block<Graph> {
585521
throw gr::exception(std::format("Can not create block {}", type));
586522
}
587523

588-
addBlock(std::move(newBlock), false); // false == do not emit message
524+
addBlock(std::move(newBlock));
589525

590-
BlockModel* oldBlock = it->get();
591526
for (auto& edge : _edges) {
592-
if (edge._sourceBlock == oldBlock) {
527+
if (edge._sourceBlock == it->get()) {
593528
edge._sourceBlock = newBlockRaw;
594529
}
595530

596-
if (edge._destinationBlock == oldBlock) {
531+
if (edge._destinationBlock == it->get()) {
597532
edge._destinationBlock = newBlockRaw;
598533
}
599534
}
600-
_blocks.erase(it);
601-
602-
std::optional<Message> result = gr::Message{};
603-
result->endpoint = graph::property::kBlockReplaced;
604-
result->data = serializeBlock(newBlockRaw);
605535

606-
(*result->data)["replacedBlockUniqueName"s] = uniqueName;
536+
std::unique_ptr<BlockModel> oldBlock = std::move(*it);
537+
_blocks.erase(it);
607538

608-
return result;
539+
return {std::move(oldBlock), newBlockRaw};
609540
}
610541

611-
std::optional<Message> propertyCallbackEmplaceEdge([[maybe_unused]] std::string_view propertyName, Message message) {
612-
assert(propertyName == graph::property::kEmplaceEdge);
613-
using namespace std::string_literals;
614-
const auto& data = message.data.value();
615-
const std::string& sourceBlock = std::get<std::string>(data.at("sourceBlock"s));
616-
const std::string& sourcePort = std::get<std::string>(data.at("sourcePort"s));
617-
const std::string& destinationBlock = std::get<std::string>(data.at("destinationBlock"s));
618-
const std::string& destinationPort = std::get<std::string>(data.at("destinationPort"s));
619-
[[maybe_unused]] const std::size_t minBufferSize = std::get<gr::Size_t>(data.at("minBufferSize"s));
620-
[[maybe_unused]] const std::int32_t weight = std::get<std::int32_t>(data.at("weight"s));
621-
const std::string edgeName = std::get<std::string>(data.at("edgeName"s));
622-
542+
void emplaceEdge(std::string_view sourceBlock, std::string sourcePort, std::string_view destinationBlock, //
543+
std::string destinationPort, [[maybe_unused]] const std::size_t minBufferSize, [[maybe_unused]] const std::int32_t weight, std::string_view edgeName) {
623544
auto sourceBlockIt = std::ranges::find_if(_blocks, [&sourceBlock](const auto& block) { return block->uniqueName() == sourceBlock; });
624545
if (sourceBlockIt == _blocks.end()) {
625546
throw gr::exception(std::format("Block {} was not found in {}", sourceBlock, this->unique_name));
@@ -645,31 +566,20 @@ class Graph : public gr::Block<Graph> {
645566

646567
const bool isArithmeticLike = sourcePortRef.portInfo().isValueTypeArithmeticLike;
647568
const std::size_t sanitizedMinBufferSize = minBufferSize == undefined_size ? graph::defaultMinBufferSize(isArithmeticLike) : minBufferSize;
648-
_edges.emplace_back(sourceBlockIt->get(), sourcePort, destinationBlockIt->get(), destinationPort, sanitizedMinBufferSize, weight, edgeName);
649-
650-
message.endpoint = graph::property::kEdgeEmplaced;
651-
return message;
569+
_edges.emplace_back(sourceBlockIt->get(), sourcePort, destinationBlockIt->get(), destinationPort, sanitizedMinBufferSize, weight, std::string(edgeName));
652570
}
653571

654-
std::optional<Message> propertyCallbackRemoveEdge([[maybe_unused]] std::string_view propertyName, Message message) {
655-
assert(propertyName == graph::property::kRemoveEdge);
656-
using namespace std::string_literals;
657-
const auto& data = message.data.value();
658-
const std::string& sourceBlock = std::get<std::string>(data.at("sourceBlock"s));
659-
const std::string& sourcePort = std::get<std::string>(data.at("sourcePort"s));
660-
572+
void removeEdgeBySourcePort(std::string_view sourceBlock, std::string_view sourcePort) {
661573
auto sourceBlockIt = std::ranges::find_if(_blocks, [&sourceBlock](const auto& block) { return block->uniqueName() == sourceBlock; });
662574
if (sourceBlockIt == _blocks.end()) {
663575
throw gr::exception(std::format("Block {} was not found in {}", sourceBlock, this->unique_name));
664576
}
665577

666-
auto& sourcePortRef = (*sourceBlockIt)->dynamicOutputPort(sourcePort);
578+
auto& sourcePortRef = (*sourceBlockIt)->dynamicOutputPort(std::string(sourcePort));
667579

668580
if (sourcePortRef.disconnect() == ConnectionResult::FAILED) {
669581
throw gr::exception(std::format("Block {} sourcePortRef could not be disconnected {}", sourceBlock, this->unique_name));
670582
}
671-
message.endpoint = graph::property::kEdgeRemoved;
672-
return message;
673583
}
674584

675585
std::optional<Message> propertyCallbackGraphInspect([[maybe_unused]] std::string_view propertyName, Message message) {

0 commit comments

Comments
 (0)