fix(cie): upgrade executor when agnocast entities arrive after callback group registration#1267
fix(cie): upgrade executor when agnocast entities arrive after callback group registration#1267atsushi421 wants to merge 3 commits intomainfrom
Conversation
…ck group registration When a callback group is detected by the monitoring loop before any agnocast entities are added to it, a plain SingleThreadedExecutor is spawned. If agnocast subscriptions are subsequently added, they never execute because the wrong executor type was selected. Fix this by periodically re-checking callback groups that were assigned a SingleThreadedExecutor. When agnocast topics appear, the old executor is stopped and replaced with a SingleThreadedAgnocastExecutor. This avoids always using SingleThreadedAgnocastExecutor (which adds 50ms latency per spin for ROS-only groups) while ensuring late-arriving agnocast entities are handled correctly. Closes #1263 Signed-off-by: atsushi421 <atsushi.yano.2@tier4.jp>
- Prevent potential deadlock by limiting upgrade loop to pre-existing entries (skip just-spawned executors that may not have entered spin()) - Change ExecutorWrapper fields to WeakPtr for consistent ownership model - Add RCLCPP_WARN when node expires during executor upgrade - Unify log severity to RCLCPP_WARN across both component container copies Signed-off-by: atsushi421 <atsushi.yano.2@tier4.jp>
There was a problem hiding this comment.
Pull request overview
This PR fixes a race in callback-isolated execution where callback groups that initially look “ROS-only” can later receive agnocast entities, requiring a switch to SingleThreadedAgnocastExecutor without paying the agnocast spin penalty for all groups.
Changes:
- Track node ownership per child executor / wrapper so groups can be re-spawned with the correct executor type later.
- Add monitoring-time “upgrade” logic: if agnocast topics appear for a group currently served by a plain
SingleThreadedExecutor, cancel/join that executor thread and replace it withSingleThreadedAgnocastExecutor. - Apply the same upgrade behavior in both
agnocast_componentsand deprecatedagnocastlibcomponent-container implementations.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/agnocastlib/src/agnocast_component_container_cie.cpp | Adds callback-group/node tracking to executor wrappers and upgrades ROS-only executors to agnocast-aware ones when topics appear. |
| src/agnocastlib/src/agnocast_callback_isolated_executor.cpp | Tracks child nodes and adds upgrade flow in the monitoring loop to replace ROS-only child executors when agnocast entities appear later. |
| src/agnocastlib/include/agnocast/agnocast_callback_isolated_executor.hpp | Adds child_nodes_ to keep node↔group↔executor vectors aligned for upgrade/stop operations. |
| src/agnocast_components/src/agnocast_component_container_cie.cpp | Same upgrade mechanism as the deprecated copy, with wrapper fields for callback group and node. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| if (node) { | ||
| start_executor_for_callback_group(node_id, callback_group, node); |
There was a problem hiding this comment.
Fixed in c2f9c76. Same fix as the non-deprecated variant: added explicit callback_group->get_associated_with_executor_atomic().store(false) after erase and before re-spawning.
| auto agnocast_topics = agnocast::get_agnocast_topics_by_group(group); | ||
| if (agnocast_topics.empty()) { | ||
| ++i; | ||
| continue; | ||
| } | ||
|
|
||
| // Agnocast entities appeared — schedule upgrade | ||
| RCLCPP_INFO( | ||
| logger, | ||
| "Agnocast topics detected in callback group previously assigned a ROS-only executor. " | ||
| "Upgrading to SingleThreadedAgnocastExecutor."); | ||
|
|
||
| auto node = child_nodes_[i].lock(); | ||
| executor->cancel(); | ||
|
|
||
| UpgradeInfo info; | ||
| info.group = group; | ||
| info.node = node; | ||
| info.thread = std::move(child_threads_[i]); | ||
| upgrades.push_back(std::move(info)); | ||
|
|
||
| auto idx = static_cast<std::ptrdiff_t>(i); | ||
| child_callback_groups_.erase(child_callback_groups_.begin() + idx); | ||
| child_nodes_.erase(child_nodes_.begin() + idx); | ||
| weak_child_executors_.erase(weak_child_executors_.begin() + idx); | ||
| child_threads_.erase(child_threads_.begin() + idx); | ||
| --pre_existing_count; | ||
| // Don't increment i; the next element shifted into this position | ||
| } | ||
| } | ||
|
|
||
| std::lock_guard<std::mutex> guard{child_resources_mutex_}; | ||
| if (!spinning.load() || !rclcpp::ok()) { | ||
| break; | ||
| // Join old threads outside the lock to avoid deadlock | ||
| for (auto & upgrade : upgrades) { | ||
| if (upgrade.thread.joinable()) { | ||
| upgrade.thread.join(); | ||
| } | ||
| } | ||
| for (auto & [group, node] : new_groups) { | ||
| if (group->get_associated_with_executor_atomic().load()) { | ||
| continue; | ||
|
|
||
| // Re-spawn upgraded groups with SingleThreadedAgnocastExecutor | ||
| if (!upgrades.empty()) { | ||
| std::lock_guard<std::mutex> guard{child_resources_mutex_}; | ||
| if (!spinning.load() || !rclcpp::ok()) { | ||
| break; | ||
| } | ||
| for (auto & upgrade : upgrades) { | ||
| if (upgrade.node) { | ||
| spawn_child_executor(upgrade.group, upgrade.node); | ||
| } else { | ||
| RCLCPP_WARN( | ||
| logger, | ||
| "Node expired during executor upgrade; callback group will no longer be served."); | ||
| } |
There was a problem hiding this comment.
Fixed in c2f9c76. Added explicit get_associated_with_executor_atomic().store(false) after thread join and before re-spawning. The rclcpp::Executor destructor already clears this flag when the old executor is destroyed, but the explicit clear serves as defensive programming and makes the intent self-documenting.
| // Upgrade ROS-only executors that now have agnocast topics | ||
| struct UpgradeInfo | ||
| { | ||
| rclcpp::CallbackGroup::SharedPtr group; | ||
| rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node; | ||
| std::thread thread; | ||
| }; | ||
| std::vector<UpgradeInfo> upgrades; | ||
|
|
||
| { | ||
| std::lock_guard<std::mutex> guard{child_resources_mutex_}; | ||
| if (!spinning.load() || !rclcpp::ok()) { | ||
| break; | ||
| } | ||
|
|
||
| // Record the count before spawning so the upgrade loop only checks pre-existing entries. | ||
| // Newly spawned executors may not have entered spin() yet, so cancelling them could | ||
| // deadlock (cancel sets spinning=false, but spin() resets it to true). | ||
| auto pre_existing_count = child_callback_groups_.size(); | ||
|
|
||
| // Spawn executors for newly discovered callback groups | ||
| for (auto & [group, node] : new_groups) { | ||
| if (group->get_associated_with_executor_atomic().load()) { | ||
| continue; | ||
| } | ||
| spawn_child_executor(group, node); | ||
| } | ||
|
|
||
| // Check existing ROS-only executors for late-arriving agnocast entities | ||
| for (size_t i = 0; i < pre_existing_count;) { | ||
| auto group = child_callback_groups_[i].lock(); | ||
| if (!group) { | ||
| ++i; | ||
| continue; | ||
| } | ||
|
|
||
| auto executor = weak_child_executors_[i].lock(); | ||
| if (!executor) { | ||
| ++i; | ||
| continue; | ||
| } | ||
|
|
||
| // Only check groups running under a plain SingleThreadedExecutor | ||
| if (!std::dynamic_pointer_cast<rclcpp::executors::SingleThreadedExecutor>(executor)) { | ||
| ++i; | ||
| continue; | ||
| } | ||
|
|
||
| auto agnocast_topics = agnocast::get_agnocast_topics_by_group(group); | ||
| if (agnocast_topics.empty()) { | ||
| ++i; | ||
| continue; | ||
| } | ||
|
|
||
| // Agnocast entities appeared — schedule upgrade | ||
| RCLCPP_INFO( | ||
| logger, | ||
| "Agnocast topics detected in callback group previously assigned a ROS-only executor. " | ||
| "Upgrading to SingleThreadedAgnocastExecutor."); | ||
|
|
||
| auto node = child_nodes_[i].lock(); | ||
| executor->cancel(); | ||
|
|
||
| UpgradeInfo info; | ||
| info.group = group; | ||
| info.node = node; | ||
| info.thread = std::move(child_threads_[i]); | ||
| upgrades.push_back(std::move(info)); | ||
|
|
||
| auto idx = static_cast<std::ptrdiff_t>(i); | ||
| child_callback_groups_.erase(child_callback_groups_.begin() + idx); | ||
| child_nodes_.erase(child_nodes_.begin() + idx); | ||
| weak_child_executors_.erase(weak_child_executors_.begin() + idx); | ||
| child_threads_.erase(child_threads_.begin() + idx); | ||
| --pre_existing_count; | ||
| // Don't increment i; the next element shifted into this position | ||
| } | ||
| } | ||
|
|
||
| std::lock_guard<std::mutex> guard{child_resources_mutex_}; | ||
| if (!spinning.load() || !rclcpp::ok()) { | ||
| break; | ||
| // Join old threads outside the lock to avoid deadlock | ||
| for (auto & upgrade : upgrades) { | ||
| if (upgrade.thread.joinable()) { | ||
| upgrade.thread.join(); | ||
| } | ||
| } | ||
| for (auto & [group, node] : new_groups) { | ||
| if (group->get_associated_with_executor_atomic().load()) { | ||
| continue; | ||
|
|
||
| // Re-spawn upgraded groups with SingleThreadedAgnocastExecutor | ||
| if (!upgrades.empty()) { | ||
| std::lock_guard<std::mutex> guard{child_resources_mutex_}; | ||
| if (!spinning.load() || !rclcpp::ok()) { | ||
| break; | ||
| } | ||
| for (auto & upgrade : upgrades) { | ||
| if (upgrade.node) { | ||
| spawn_child_executor(upgrade.group, upgrade.node); |
There was a problem hiding this comment.
The upgrade path requires the agnocast kernel module to register agnocast entities at runtime, which is not available in the unit test environment. The existing integration tests (test_agnocast_only_callback_isolated_executor) also require the kernel module. The fix has been verified through code review and build testing.
| cancel_executor(wrapper); | ||
| it = executor_wrappers.erase(it); | ||
|
|
||
| if (node) { | ||
| start_executor_for_callback_group(node_id, callback_group, node); |
There was a problem hiding this comment.
Fixed in c2f9c76. Added explicit callback_group->get_associated_with_executor_atomic().store(false) after erase(it) destroys the old executor wrapper and before calling start_executor_for_callback_group().
The rclcpp::Executor destructor already clears this flag, but adding explicit store(false) makes the intent self-documenting and guards against edge cases in executor lifetime ordering. Signed-off-by: atsushi421 <atsushi.yano.2@tier4.jp>
Description
In
CallbackIsolatedAgnocastExecutor::spawn_child_executorand bothComponentManagerCallbackIsolated::start_executor_for_callback_groupimplementations, the executor type (SingleThreadedExecutorvsSingleThreadedAgnocastExecutor) is decided at the moment a callback group is first detected. If agnocast entities are added to a callback group after the executor has already been spawned, theSingleThreadedExecutoris never replaced, and the agnocast entities silently fail to execute.This PR adds a monitoring-time upgrade mechanism:
SingleThreadedExecutorusingdynamic_pointer_cast.get_agnocast_topics_by_group()returns non-empty for such a group, the old executor is cancelled, its thread is joined, and the group is re-spawned with aSingleThreadedAgnocastExecutor.SingleThreadedAgnocastExecutor(which adds ~50ms latency per spin iteration for ROS-only groups due toget_next_agnocast_executableblocking).Key design decisions:
CallbackIsolatedAgnocastExecutor, the upgrade loop only checks pre-existing entries (recorded before spawning new groups). Newly spawned executors may not have enteredspin()yet, so cancelling them could deadlock becausecancel()setsspinning=falsebutspin()resets it viaspinning.exchange(true).CallbackIsolatedAgnocastExecutor, threads are joined outsidechild_resources_mutex_to avoid deadlock (child threads' callbacks may acquire this mutex). In the component containers, the existingcancel_executorpattern (join under lock) is reused since the child threads don't acquireexecutor_wrappers_mutex_.ExecutorWrapperfields (callback_group_,node_) useWeakPtrto avoid extending object lifetimes, consistent withCallbackIsolatedAgnocastExecutor'schild_nodes_pattern.get_associated_with_executor_atomic().store(false)is called explicitly after the old executor is destroyed, as a defensive measure (therclcpp::Executordestructor also clears this flag).Related links
How was this PR tested?
bash scripts/test/e2e_test_1to1.bash(required)bash scripts/test/e2e_test_2to2.bash(required)bash scripts/test/run_requires_kernel_module_tests.bash(required)Notes for reviewers
Version Update Label (Required)
Please add exactly one of the following labels to this PR:
need-major-update: User API breaking changesneed-minor-update: Internal API breaking changes (heaphook/kmod/agnocastlib compatibility)need-patch-update: Bug fixes and other changesImportant notes:
need-major-updateorneed-minor-update, please include this in the PR title as well.fix(foo)[needs major version update]: barorfeat(baz)[needs minor version update]: quxrun-build-testlabel. The PR can only be merged after the build tests pass.See CONTRIBUTING.md for detailed versioning rules.