@@ -356,6 +356,11 @@ class SchedulerBase : public Block<Derived> {
356
356
if (runnerID == 0UZ || _nRunningJobs.load (std::memory_order_acquire) == 0UZ) {
357
357
this ->processScheduledMessages (); // execute the scheduler- and Graph-specific message handler only once globally
358
358
}
359
+
360
+ // Zombies are cleaned per-thread, as we remove from the localBlockList as well.
361
+ // Cleaning zombies has low priority, so uses process_stream_to_message_ratio (a different ratio could be introduced)
362
+ cleanupZombieBlocks (localBlockList);
363
+
359
364
std::ranges::for_each (localBlockList, [](auto & block) { block->processScheduledMessages (); });
360
365
activeState = this ->state ();
361
366
msgToCount++;
@@ -376,10 +381,6 @@ class SchedulerBase : public Block<Derived> {
376
381
break ;
377
382
}
378
383
} else if (activeState == lifecycle::State::PAUSED) {
379
- if (_graph.hasTopologyChanged ()) {
380
- // TODO: update localBlockList topology if needed
381
- _graph.ackTopologyChange ();
382
- }
383
384
std::this_thread::sleep_for (std::chrono::milliseconds (timeout_ms));
384
385
msgToCount = 0UZ;
385
386
} else { // other states
@@ -416,6 +417,7 @@ class SchedulerBase : public Block<Derived> {
416
417
this ->emitErrorMessageIfAny (" forEachBlock -> stop() -> LifecycleState" , block->changeStateTo (lifecycle::State::STOPPED));
417
418
}
418
419
});
420
+
419
421
this ->emitErrorMessageIfAny (" stop() -> LifecycleState ->STOPPED" , this ->changeStateTo (lifecycle::State::STOPPED));
420
422
this ->emitErrorMessageIfAny (" stop() -> LifecycleState ->IDLE" , this ->changeStateTo (lifecycle::State::IDLE));
421
423
}
@@ -502,6 +504,93 @@ class SchedulerBase : public Block<Derived> {
502
504
return message;
503
505
}
504
506
507
+ /*
508
+ Zombie Tutorial:
509
+
510
+ Blocks can't be deleted unless stopped, but since stopping can take time (async) we move such blocks
511
+ to the "zombie list" and disconnect them immediately from the graph. This allows them to stop and be deleted
512
+ safely.
513
+
514
+ Periodically, we call cleanupZombieBlocks(), which iterates the zombie list and deletes the blocks that are now stopped.
515
+
516
+ cleanupZombieBlocks() is called *per-thread*, since we also need to update the localBlockList, i.e.: removing dangling block pointers
517
+ from the localBlockList.
518
+
519
+ We also update the _jobLists member variable, but probably that member can be removed, seems unneeded and only used so unit-tests can
520
+ query it.
521
+ */
522
+ void cleanupZombieBlocks (std::vector<BlockModel*>& localBlockList) {
523
+ if (localBlockList.empty ()) {
524
+ return ;
525
+ }
526
+
527
+ std::lock_guard guard (_graph._zombieBlocksMutex );
528
+
529
+ auto & zombieBlocks = _graph._zombieBlocks ;
530
+ auto it = zombieBlocks.begin ();
531
+
532
+ while (it != zombieBlocks.end ()) {
533
+ auto localBlockIt = std::find (localBlockList.begin (), localBlockList.end (), it->get ());
534
+ if (localBlockIt == localBlockList.end ()) {
535
+ // we only care about the blocks local to our thread.
536
+ ++it;
537
+ continue ;
538
+ }
539
+
540
+ bool shouldDelete = false ;
541
+
542
+ switch ((*it)->state ()) {
543
+ case lifecycle::State::IDLE:
544
+ case lifecycle::State::STOPPED:
545
+ case lifecycle::State::INITIALISED:
546
+ // This block can be deleted immediately
547
+ shouldDelete = true ;
548
+ break ;
549
+ case lifecycle::State::ERROR:
550
+ // Delete as well. (Separate case block, as better ideas welcome)
551
+ shouldDelete = true ;
552
+ break ;
553
+ case lifecycle::State::REQUESTED_STOP:
554
+ // This block will be deleted later
555
+ break ;
556
+ case lifecycle::State::REQUESTED_PAUSE:
557
+ // This block will be deleted later
558
+ // There's no transition from REQUESTED_PAUSE to REQUESTED_STOP
559
+ // Will be moved to REQUESTED_STOP as soon as it's possible
560
+ break ;
561
+ case lifecycle::State::PAUSED:
562
+ // This zombie was in REQUESTED_PAUSE and now finally in PAUSED. Can be stopped now.
563
+ // Will be deleted in a next zombie maintenance period
564
+ this ->emitErrorMessageIfAny (" cleanupZombieBlocks" , (*it)->changeStateTo (lifecycle::State::REQUESTED_STOP));
565
+ break ;
566
+ case lifecycle::State::RUNNING: assert (false && " Doesn't happen: zombie blocks are never running" ); break ;
567
+ }
568
+
569
+ if (shouldDelete) {
570
+ localBlockList.erase (localBlockIt);
571
+
572
+ BlockModel* zombieRaw = it->get ();
573
+ it = zombieBlocks.erase (it); // ~Block() runs here
574
+
575
+ // We need to remove zombieRaw from jobLists as well, in case Scheduler ever goes to INITIALIZED
576
+ // again.
577
+ // TODO: I'd argue we should remove _jobLists to minimize having to maintain state. Instead, a job list can be
578
+ // calculated in start().
579
+ std::lock_guard lock (_jobListsMutex);
580
+ for (auto & jobList : *this ->_jobLists ) {
581
+ auto job_it = std::remove (jobList.begin (), jobList.end (), zombieRaw);
582
+ if (job_it != jobList.end ()) {
583
+ jobList.erase (job_it, jobList.end ());
584
+ break ;
585
+ }
586
+ }
587
+
588
+ } else {
589
+ ++it;
590
+ }
591
+ }
592
+ }
593
+
505
594
std::optional<Message> propertyCallbackReplaceBlock ([[maybe_unused]] std::string_view propertyName, Message message) {
506
595
assert (propertyName == scheduler::property::kReplaceBlock );
507
596
using namespace std ::string_literals;
0 commit comments