Skip to content

Commit 0675ff6

Browse files
authored
feat: enhance destroy_subscribers behavior (#499)
1 parent 891f357 commit 0675ff6

File tree

3 files changed

+112
-13
lines changed

3 files changed

+112
-13
lines changed

src/rai_core/rai/communication/ros2/api.py

+49-8
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ class ROS2TopicAPI:
164164
_publishers: Dictionary mapping topic names to their publisher instances
165165
"""
166166

167-
def __init__(self, node: rclpy.node.Node) -> None:
167+
def __init__(
168+
self, node: rclpy.node.Node, destroy_subscribers: bool = False
169+
) -> None:
168170
"""Initialize the ROS2 topic API.
169171
170172
Args:
@@ -181,7 +183,7 @@ def __init__(self, node: rclpy.node.Node) -> None:
181183
# preventing node crashes.
182184
self._last_msg: Dict[str, Tuple[float, Any]] = {}
183185
self._subscriptions: Dict[str, rclpy.node.Subscription] = {}
184-
self._destroy_subscriptions: bool = False
186+
self._destroy_subscribers: bool = destroy_subscribers
185187

186188
def get_topic_names_and_types(
187189
self, no_demangle: bool = False
@@ -298,7 +300,35 @@ def receive(
298300
def _generic_callback(self, topic: str, msg: Any) -> None:
299301
self._last_msg[topic] = (time.time(), msg)
300302

301-
def _wait_for_message(
303+
def _wait_for_message_once(
304+
self,
305+
msg_cls: Type[Any],
306+
node: rclpy.node.Node,
307+
topic: str,
308+
qos_profile: QoSProfile,
309+
timeout_sec: float,
310+
) -> Tuple[bool, Any]:
311+
ts = time.time()
312+
success = False
313+
msg = None
314+
315+
def callback(received_msg: Any):
316+
nonlocal success, msg
317+
success = True
318+
msg = received_msg
319+
320+
sub = node.create_subscription(
321+
msg_cls,
322+
topic,
323+
callback,
324+
qos_profile=qos_profile,
325+
)
326+
while not success and time.time() - ts < timeout_sec:
327+
time.sleep(0.01)
328+
node.destroy_subscription(sub)
329+
return success, msg
330+
331+
def _wait_for_message_persistent(
302332
self,
303333
msg_cls: Type[Any],
304334
node: rclpy.node.Node,
@@ -313,19 +343,30 @@ def _wait_for_message(
313343
partial(self._generic_callback, topic),
314344
qos_profile=qos_profile,
315345
)
316-
317346
ts = time.time()
318347
while time.time() - ts < timeout_sec:
319348
if topic in self._last_msg:
320349
if self._last_msg[topic][0] + timeout_sec > time.time():
321-
if self._destroy_subscriptions:
322-
node.destroy_subscription(self._subscriptions.pop(topic))
323350
return True, self._last_msg[topic][1]
324351
time.sleep(0.01)
325-
if self._destroy_subscriptions:
326-
node.destroy_subscription(self._subscriptions.pop(topic))
327352
return False, None
328353

354+
def _wait_for_message(
355+
self,
356+
msg_cls: Type[Any],
357+
node: rclpy.node.Node,
358+
topic: str,
359+
qos_profile: QoSProfile,
360+
timeout_sec: float,
361+
) -> Tuple[bool, Any]:
362+
if self._destroy_subscribers:
363+
return self._wait_for_message_once(
364+
msg_cls, node, topic, qos_profile, timeout_sec
365+
)
366+
return self._wait_for_message_persistent(
367+
msg_cls, node, topic, qos_profile, timeout_sec
368+
)
369+
329370
def _is_topic_available(self, topic: str, timeout_sec: float) -> bool:
330371
ts = time.time()
331372
topic = topic if topic.startswith("/") else f"/{topic}"

src/rai_core/rai/communication/ros2/connectors/ari_connector.py

+57-4
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,69 @@
3939

4040

4141
class ROS2ARIConnector(ROS2ActionMixin, ROS2ServiceMixin, ARIConnector[ROS2ARIMessage]):
42+
"""ROS2-specific implementation of the ARIConnector.
43+
44+
This connector provides functionality for ROS2 communication through topics,
45+
services, and actions, as well as TF (Transform) operations.
46+
47+
Parameters
48+
----------
49+
node_name : str, optional
50+
Name of the ROS2 node. If not provided, generates a unique name with UUID.
51+
destroy_subscribers : bool, optional
52+
Whether to destroy subscribers after receiving a message, by default False.
53+
54+
Methods
55+
-------
56+
get_topics_names_and_types()
57+
Get list of available topics and their message types.
58+
get_services_names_and_types()
59+
Get list of available services and their types.
60+
get_actions_names_and_types()
61+
Get list of available actions and their types.
62+
send_message(message, target, msg_type, auto_qos_matching=True, qos_profile=None, **kwargs)
63+
Send a message to a specified topic.
64+
receive_message(source, timeout_sec=1.0, msg_type=None, auto_topic_type=True, **kwargs)
65+
Receive a message from a specified topic.
66+
wait_for_transform(tf_buffer, target_frame, source_frame, timeout_sec=1.0)
67+
Wait for a transform to become available.
68+
get_transform(target_frame, source_frame, timeout_sec=5.0)
69+
Get the transform between two frames.
70+
create_service(service_name, on_request, on_done=None, service_type, **kwargs)
71+
Create a ROS2 service.
72+
create_action(action_name, generate_feedback_callback, action_type, **kwargs)
73+
Create a ROS2 action server.
74+
shutdown()
75+
Clean up resources and shut down the connector.
76+
77+
Notes
78+
-----
79+
Threading Model:
80+
The connector creates a MultiThreadedExecutor that runs in a dedicated thread.
81+
This executor processes all ROS2 callbacks and operations asynchronously.
82+
83+
Subscriber Lifecycle:
84+
The `destroy_subscribers` parameter controls subscriber cleanup behavior:
85+
- True: Subscribers are destroyed after receiving a message
86+
- Pros: Better resource utilization
87+
- Cons: Known stability issues (see: https://github.com/ros2/rclpy/issues/1142)
88+
- False (default): Subscribers remain active after message reception
89+
- Pros: More stable operation, avoids potential crashes
90+
- Cons: May lead to memory/performance overhead from inactive subscribers
91+
"""
92+
4293
def __init__(
43-
self, node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}"
94+
self,
95+
node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}",
96+
destroy_subscribers: bool = False,
4497
):
4598
super().__init__()
4699
self._node = Node(node_name)
47-
self._topic_api = ROS2TopicAPI(self._node)
100+
self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers)
48101
self._service_api = ROS2ServiceAPI(self._node)
49102
self._actions_api = ROS2ActionAPI(self._node)
50103
self._tf_buffer = Buffer(node=self._node)
51-
self.tf_listener = TransformListener(self._tf_buffer, self._node)
104+
self._tf_listener = TransformListener(self._tf_buffer, self._node)
52105

53106
self._executor = MultiThreadedExecutor()
54107
self._executor.add_node(self._node)
@@ -173,7 +226,7 @@ def node(self) -> Node:
173226
return self._node
174227

175228
def shutdown(self):
176-
self.tf_listener.unregister()
229+
self._tf_listener.unregister()
177230
self._node.destroy_node()
178231
self._actions_api.shutdown()
179232
self._topic_api.shutdown()

src/rai_core/rai/communication/ros2/connectors/hri_connector.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
1516
import threading
1617
import uuid
1718
from typing import Any, Callable, List, Literal, Optional, Tuple, Union
1819

1920
from rclpy.executors import MultiThreadedExecutor
2021
from rclpy.node import Node
2122

22-
import rai_interfaces.msg
2323
from rai.communication import HRIConnector
2424
from rai.communication.ros2.api import (
2525
ConfigurableROS2TopicAPI,
@@ -31,6 +31,11 @@
3131
from rai.communication.ros2.connectors.service_mixin import ROS2ServiceMixin
3232
from rai.communication.ros2.messages import ROS2HRIMessage
3333

34+
try:
35+
import rai_interfaces.msg
36+
except ImportError:
37+
logging.warning("rai_interfaces is not installed, ROS 2 HRIMessage will not work.")
38+
3439

3540
class ROS2HRIConnector(ROS2ActionMixin, ROS2ServiceMixin, HRIConnector[ROS2HRIMessage]):
3641
def __init__(

0 commit comments

Comments
 (0)