@@ -46,6 +46,7 @@ StandardBridgeManager::~StandardBridgeManager()
4646 }
4747
4848 active_bridges_.clear ();
49+ active_r2a_service_bridges_.clear ();
4950 container_node_.reset ();
5051 executor_.reset ();
5152
@@ -74,6 +75,7 @@ void StandardBridgeManager::run()
7475 check_parent_alive ();
7576 check_managed_bridges ();
7677 check_active_bridges ();
78+ check_active_service_bridges ();
7779 check_should_exit ();
7880 }
7981}
@@ -110,6 +112,9 @@ void StandardBridgeManager::on_mq_request(mqd_t fd)
110112 if (req.is_service ) {
111113 // TODO(bdm-k): For debugging purposes. Remove this later.
112114 RCLCPP_INFO (logger_, " Received service bridge request for name='%s'" , req.srv_target .name );
115+
116+ activate_service_bridge (req);
117+
113118 return ;
114119 }
115120
@@ -132,12 +137,12 @@ void StandardBridgeManager::register_request(const MqMsgBridge & req)
132137{
133138 // Locally, unique keys include the direction. However, we register the raw topic name (without
134139 // direction) to the kernel to enforce single-process ownership for the entire topic.
135- const auto [topic_name, topic_name_with_direction] = extract_topic_info (req);
136- if (active_bridges_.count (topic_name_with_direction ) != 0U ) {
140+ const auto bridge_target_key = derive_bridge_target_key (req);
141+ if (active_bridges_.count (bridge_target_key ) != 0U ) {
137142 return ;
138143 }
139144
140- auto & info = managed_bridges_[topic_name ];
145+ auto & info = managed_bridges_[derive_topic_or_service_name (req) ];
141146 bool is_r2a = (req.direction == BridgeDirection::ROS2_TO_AGNOCAST);
142147 (is_r2a ? info.req_r2a : info.req_a2r ) = req;
143148}
@@ -177,24 +182,24 @@ void StandardBridgeManager::rollback_bridge_from_kernel(const std::string & topi
177182 }
178183}
179184
180- bool StandardBridgeManager::activate_bridge (const MqMsgBridge & req, const std::string & topic_name )
185+ bool StandardBridgeManager::activate_bridge (const MqMsgBridge & req)
181186{
182187 bool is_r2a = (req.direction == BridgeDirection::ROS2_TO_AGNOCAST);
183- std::string_view suffix = is_r2a ? SUFFIX_R2A : SUFFIX_A2R ;
184- std::string topic_name_with_direction = topic_name + std::string (suffix );
188+ std::string topic_name = derive_topic_or_service_name (req) ;
189+ std::string bridge_target_key = derive_bridge_target_key (req );
185190
186- if (active_bridges_.count (topic_name_with_direction ) != 0U ) {
191+ if (active_bridges_.count (bridge_target_key ) != 0U ) {
187192 return true ;
188193 }
189194
190195 try {
191196 rclcpp::QoS target_qos = is_r2a ? get_subscriber_qos (topic_name, req.pubsub_target .target_id )
192197 : get_publisher_qos (topic_name, req.pubsub_target .target_id );
193198
194- auto bridge = loader_.create (req, topic_name_with_direction , container_node_, target_qos);
199+ auto bridge = loader_.create (req, bridge_target_key , container_node_, target_qos);
195200
196201 if (!bridge) {
197- RCLCPP_ERROR (logger_, " Failed to create bridge for '%s'" , topic_name_with_direction .c_str ());
202+ RCLCPP_ERROR (logger_, " Failed to create bridge for '%s'" , bridge_target_key .c_str ());
198203 if (ioctl (agnocast_fd, AGNOCAST_NOTIFY_BRIDGE_SHUTDOWN_CMD) < 0 ) {
199204 RCLCPP_ERROR (logger_, " Failed to notify bridge shutdown: %s" , strerror (errno));
200205 }
@@ -213,7 +218,7 @@ bool StandardBridgeManager::activate_bridge(const MqMsgBridge & req, const std::
213218 logger_, " Failed to update ROS 2 subscriber count for topic '%s'." , topic_name.c_str ());
214219 }
215220 }
216- active_bridges_[topic_name_with_direction ] = bridge;
221+ active_bridges_[bridge_target_key ] = bridge;
217222
218223 auto cast_bridge = std::static_pointer_cast<agnocast::BridgeBase>(bridge);
219224
@@ -227,7 +232,44 @@ bool StandardBridgeManager::activate_bridge(const MqMsgBridge & req, const std::
227232
228233 } catch (const std::exception & e) {
229234 RCLCPP_ERROR (
230- logger_, " Failed to activate bridge for topic '%s': %s" , topic_name_with_direction.c_str (),
235+ logger_, " Failed to activate bridge for topic '%s': %s" , bridge_target_key.c_str (), e.what ());
236+ return false ;
237+ }
238+ }
239+
240+ bool StandardBridgeManager::activate_service_bridge (const MqMsgBridge & req)
241+ {
242+ std::string bridge_target_key = derive_bridge_target_key (req);
243+
244+ if (active_r2a_service_bridges_.count (bridge_target_key) != 0U ) {
245+ return true ;
246+ }
247+
248+ try {
249+ auto bridge = loader_.create_service (req, bridge_target_key, container_node_);
250+
251+ if (!bridge) {
252+ RCLCPP_ERROR (logger_, " Failed to create service bridge for '%s'" , bridge_target_key.c_str ());
253+ if (ioctl (agnocast_fd, AGNOCAST_NOTIFY_BRIDGE_SHUTDOWN_CMD) < 0 ) {
254+ RCLCPP_ERROR (logger_, " Failed to notify bridge shutdown: %s" , strerror (errno));
255+ }
256+ shutdown_requested_ = true ;
257+ return false ;
258+ }
259+
260+ active_r2a_service_bridges_[bridge_target_key] = bridge;
261+
262+ auto cast_bridge = std::static_pointer_cast<agnocast::ServiceBridgeBase>(bridge);
263+
264+ auto [cb0, cb1] = cast_bridge->get_callback_groups ();
265+ executor_->add_callback_group (cb0, container_node_->get_node_base_interface (), true );
266+ executor_->add_callback_group (cb1, container_node_->get_node_base_interface (), true );
267+
268+ return true ;
269+
270+ } catch (const std::exception & e) {
271+ RCLCPP_ERROR (
272+ logger_, " Failed to activate service bridge for '%s': %s" , bridge_target_key.c_str (),
231273 e.what ());
232274 return false ;
233275 }
@@ -283,7 +325,7 @@ void StandardBridgeManager::process_managed_bridge(
283325
284326 switch (status) {
285327 case AddBridgeResult::SUCCESS:
286- if (!activate_bridge (*req, topic_name )) {
328+ if (!activate_bridge (*req)) {
287329 // Rollback: remove bridge from kernel if activation failed
288330 rollback_bridge_from_kernel (topic_name, is_r2a);
289331 }
@@ -363,6 +405,54 @@ void StandardBridgeManager::check_active_bridges()
363405 }
364406}
365407
408+ void StandardBridgeManager::check_active_service_bridges ()
409+ {
410+ auto it = active_r2a_service_bridges_.begin ();
411+ while (it != active_r2a_service_bridges_.end ()) {
412+ const std::string & key = it->first ;
413+
414+ std::string_view key_view (key);
415+ std::string_view service_name_view = key_view.substr (0 , key_view.size () - SUFFIX_LEN);
416+
417+ std::string service_name (service_name_view);
418+
419+ auto result = get_agnocast_subscriber_count (create_service_request_topic_name (service_name));
420+ if (result.count == -1 ) {
421+ RCLCPP_ERROR (
422+ logger_, " Failed to get subscriber count for topic '%s'. Requesting shutdown." ,
423+ create_service_request_topic_name (service_name).c_str ());
424+ if (ioctl (agnocast_fd, AGNOCAST_NOTIFY_BRIDGE_SHUTDOWN_CMD) < 0 ) {
425+ RCLCPP_ERROR (logger_, " Failed to notify bridge shutdown: %s" , strerror (errno));
426+ }
427+ shutdown_requested_ = true ;
428+ return ;
429+ }
430+
431+ if (result.count <= 0 ) {
432+ // TODO(bdm-k): For debugging purposes. Remove this later.
433+ RCLCPP_INFO (
434+ logger_, " Removing service bridge for '%s' because the service is down." ,
435+ service_name.c_str ());
436+
437+ // struct ioctl_remove_service_bridge_args args
438+ // {
439+ // };
440+ // args.service_name = {service_name_view.data(), service_name_view.size()};
441+ // args.is_r2a = true;
442+
443+ // if (ioctl(agnocast_fd, AGNOCAST_REMOVE_SERVICE_BRIDGE_CMD, &args) != 0) {
444+ // RCLCPP_ERROR(
445+ // logger_, "AGNOCAST_REMOVE_SERVICE_BRIDGE_CMD failed for service '%s': %s",
446+ // std::string(service_name_view).c_str(), strerror(errno));
447+ // }
448+
449+ it = active_r2a_service_bridges_.erase (it);
450+ } else {
451+ ++it;
452+ }
453+ }
454+ }
455+
366456void StandardBridgeManager::check_managed_bridges ()
367457{
368458 for (auto it = managed_bridges_.begin (); it != managed_bridges_.end ();) {
@@ -407,17 +497,17 @@ void StandardBridgeManager::check_should_exit()
407497}
408498
409499void StandardBridgeManager::remove_active_bridge (
410- const std::string & topic_name_with_direction , bool keep_managed)
500+ const std::string & bridge_target_key , bool keep_managed)
411501{
412- if (topic_name_with_direction .size () <= SUFFIX_LEN) {
502+ if (bridge_target_key .size () <= SUFFIX_LEN) {
413503 return ;
414504 }
415505
416- if (active_bridges_.count (topic_name_with_direction ) == 0 ) {
506+ if (active_bridges_.count (bridge_target_key ) == 0 ) {
417507 return ;
418508 }
419509
420- std::string_view key_view (topic_name_with_direction );
510+ std::string_view key_view (bridge_target_key );
421511 std::string_view suffix = key_view.substr (key_view.size () - SUFFIX_LEN);
422512 std::string_view topic_name_view = key_view.substr (0 , key_view.size () - SUFFIX_LEN);
423513
@@ -435,7 +525,7 @@ void StandardBridgeManager::remove_active_bridge(
435525 std::string (topic_name_view).c_str (), strerror (errno));
436526 }
437527
438- active_bridges_.erase (topic_name_with_direction );
528+ active_bridges_.erase (bridge_target_key );
439529
440530 if (keep_managed) {
441531 return ;
@@ -457,17 +547,21 @@ void StandardBridgeManager::remove_active_bridge(
457547 }
458548}
459549
460- std::pair<std::string, std::string> StandardBridgeManager::extract_topic_info (
461- const MqMsgBridge & req)
550+ std::string StandardBridgeManager::derive_topic_or_service_name (const MqMsgBridge & req)
462551{
463- std::string raw_name (
464- &req.pubsub_target .topic_name [0 ],
465- strnlen (&req.pubsub_target .topic_name [0 ], sizeof (req.pubsub_target .topic_name )));
466-
467- std::string_view suffix =
468- (req.direction == BridgeDirection::ROS2_TO_AGNOCAST) ? SUFFIX_R2A : SUFFIX_A2R;
552+ std::string raw_name{
553+ static_cast <const char *>(req.is_service ? req.srv_target .name : req.pubsub_target .topic_name )};
554+ return raw_name;
555+ }
469556
470- return {raw_name, raw_name + std::string (suffix)};
557+ std::string StandardBridgeManager::derive_bridge_target_key (const MqMsgBridge & req)
558+ {
559+ std::string raw_name{
560+ static_cast <const char *>(req.is_service ? req.srv_target .name : req.pubsub_target .topic_name )};
561+ const bool is_r2a = (req.direction == BridgeDirection::ROS2_TO_AGNOCAST);
562+ const std::string_view suffix = req.is_service ? (is_r2a ? SUFFIX_SRV_R2A : SUFFIX_SRV_A2R)
563+ : (is_r2a ? SUFFIX_R2A : SUFFIX_A2R);
564+ return raw_name + std::string (suffix);
471565}
472566
473567} // namespace agnocast
0 commit comments