Skip to content

Commit 5d06c75

Browse files
committed
Implement handling "set graph YAML" message
We move all the existing blocks to the zombie list. They'll get deleted when stopped. The scheduler needs to be restarted, so its init() method runs again to construct the job list and threads. The message is being processed by a worker thread, therefore we can't restart the scheduler here as that would crash. Scheduler is restarted automatically by OD when it receives the message reply. Also, the scheduler didn't support being restarted at all, the reset() method implementation was missing. Signed-off-by: Sergio Martins <[email protected]>
1 parent b324523 commit 5d06c75

File tree

2 files changed

+88
-5
lines changed

2 files changed

+88
-5
lines changed

core/include/gnuradio-4.0/Graph.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ class Graph : public gr::Block<Graph> {
346346
[[nodiscard]] std::span<std::unique_ptr<BlockModel>> blocks() noexcept { return {_blocks}; }
347347
[[nodiscard]] std::span<Edge> edges() noexcept { return {_edges}; }
348348

349+
void clear() {
350+
_blocks.clear();
351+
_edges.clear();
352+
}
353+
349354
/**
350355
* @return atomic sequence counter that indicates if any block could process some data or messages
351356
*/

core/include/gnuradio-4.0/Scheduler.hpp

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -667,18 +667,84 @@ class SchedulerBase : public Block<Derived> {
667667
_zombieBlocks.push_back(std::move(block));
668668
}
669669

670+
// Moves all blocks into the zombie list
671+
// Useful for bulk operations such as "set grc yaml" message
672+
void makeAllZombies() {
673+
std::lock_guard guard(_zombieBlocksMutex);
674+
675+
for (auto& block : this->_graph.blocks()) {
676+
switch (block->state()) {
677+
case lifecycle::State::RUNNING:
678+
case lifecycle::State::REQUESTED_PAUSE:
679+
case lifecycle::State::PAUSED: //
680+
this->emitErrorMessageIfAny("makeAllZombies", block->changeStateTo(lifecycle::State::REQUESTED_STOP));
681+
break;
682+
683+
case lifecycle::State::INITIALISED: //
684+
this->emitErrorMessageIfAny("makeAllZombies", block->changeStateTo(lifecycle::State::STOPPED));
685+
break;
686+
case lifecycle::State::IDLE:
687+
case lifecycle::State::STOPPED:
688+
case lifecycle::State::ERROR:
689+
case lifecycle::State::REQUESTED_STOP:
690+
// Can go into the zombie list and deleted
691+
break;
692+
}
693+
694+
_zombieBlocks.push_back(std::move(block));
695+
}
696+
697+
this->_graph.clear();
698+
}
699+
670700
std::optional<Message> propertyCallbackGraphGRC([[maybe_unused]] std::string_view propertyName, Message message) {
671701
assert(propertyName == scheduler::property::kGraphGRC);
672702

673703
auto& pluginLoader = gr::globalPluginLoader();
674704
if (message.cmd == message::Command::Get) {
675705
message.data = property_map{{"value", gr::saveGrc(pluginLoader, _graph)}};
676706
} else if (message.cmd == message::Command::Set) {
677-
// const auto& data = message.data.value();
678-
// auto yamlContent = std::get<std::string>(data.at("value"s));
679-
// Graph = gr::loadGrc(pluginLoader, yamlContent);
680-
// We need to stop the scheduler first
681-
throw gr::exception(std::format("Not implemented {}", message.cmd));
707+
const auto& data = message.data.value();
708+
auto yamlContent = std::get<std::string>(data.at("value"s));
709+
710+
try {
711+
Graph newGraph = gr::loadGrc(pluginLoader, yamlContent);
712+
713+
makeAllZombies();
714+
715+
const auto originalState = this->state();
716+
717+
switch (originalState) {
718+
case lifecycle::State::RUNNING:
719+
case lifecycle::State::REQUESTED_PAUSE:
720+
case lifecycle::State::PAUSED: //
721+
this->emitErrorMessageIfAny("propertyCallbackGraphGRC -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::REQUESTED_STOP));
722+
this->emitErrorMessageIfAny("propertyCallbackGraphGRC -> STOPPED", this->changeStateTo(lifecycle::State::STOPPED));
723+
break;
724+
case lifecycle::State::REQUESTED_STOP:
725+
case lifecycle::State::INITIALISED: //
726+
this->emitErrorMessageIfAny("propertyCallbackGraphGRC -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::STOPPED));
727+
break;
728+
case lifecycle::State::IDLE:
729+
assert(false); // doesn't happen
730+
break;
731+
case lifecycle::State::STOPPED:
732+
case lifecycle::State::ERROR: break;
733+
}
734+
735+
_graph = std::move(newGraph);
736+
737+
// Now ideally we'd just restart the Scheduler, but we can't since we're processing a message inside a working thread.
738+
// When the scheduler starts running it asserts that _nRunningJobs is 0, so we can't start it now, we're in the job.
739+
// We need to let poolWorker() unwind, decrement _nRunningJobs and then move scheduler to its original value.
740+
// Alternatively, we could forbid kGraphGRC unless Scheduler was in STOPPED state. That would simplify logic, but
741+
// put more burden on the client.
742+
743+
message.data = property_map{{"originalSchedulerState", int(originalState)}};
744+
} catch (const std::exception& e) {
745+
message.data = std::unexpected(Error{std::format("Error parsing YAML: {}", e.what())});
746+
}
747+
682748
} else {
683749
throw gr::exception(std::format("Unexpected command type {}", message.cmd));
684750
}
@@ -748,6 +814,7 @@ class Simple : public SchedulerBase<Simple<execution, TProfiler>, execution, TPr
748814

749815
this->_adoptionBlocks.clear();
750816
this->_adoptionBlocks.resize(n_batches);
817+
this->_jobLists->clear();
751818
this->_jobLists->reserve(n_batches);
752819
for (std::size_t i = 0; i < n_batches; i++) {
753820
// create job-set for thread
@@ -758,6 +825,11 @@ class Simple : public SchedulerBase<Simple<execution, TProfiler>, execution, TPr
758825
}
759826
}
760827
}
828+
829+
void reset() {
830+
base_t::reset();
831+
init();
832+
}
761833
};
762834

763835
template<ExecutionPolicy execution = ExecutionPolicy::singleThreaded, profiling::ProfilerLike TProfiler = profiling::null::Profiler>
@@ -828,6 +900,7 @@ detecting cycles and blocks which can be reached from several source blocks.)"">
828900
this->_adoptionBlocks.clear();
829901
this->_adoptionBlocks.resize(n_batches);
830902
std::lock_guard lock(base_t::_jobListsMutex);
903+
this->_jobLists->clear();
831904
this->_jobLists->reserve(n_batches);
832905
for (std::size_t i = 0; i < n_batches; i++) {
833906
// create job-set for thread
@@ -838,6 +911,11 @@ detecting cycles and blocks which can be reached from several source blocks.)"">
838911
}
839912
}
840913
}
914+
915+
void reset() {
916+
base_t::reset();
917+
init();
918+
}
841919
};
842920
} // namespace gr::scheduler
843921

0 commit comments

Comments
 (0)