Skip to content

Commit 0f8c427

Browse files
author
David Conner
committed
destroy sub/pub/client in executor thread; use SingleThreadedExecutor without callback groups; use basic pub/sub for onboard; cleanup
1 parent e5c8977 commit 0f8c427

File tree

12 files changed

+145
-96
lines changed

12 files changed

+145
-96
lines changed

flexbe_core/flexbe_core/behavior_library.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ def parse_packages(self):
6464
pkg = parse_package(os.path.join(pkg_path, 'share', pkg_name))
6565
for export in pkg.exports:
6666
if export.tagname == "flexbe_behaviors":
67-
self._add_behavior_manifests(os.path.join(pkg_path, 'lib', pkg_name, 'manifest'), pkg_name)
67+
try:
68+
self._add_behavior_manifests(os.path.join(pkg_path, 'lib', pkg_name, 'manifest'), pkg_name)
69+
except KeyError as exc:
70+
print(f"Error : duplicate behavior name found in {pkg_name} \n {exc}", flush=True)
71+
raise exc
6872

6973
def _add_behavior_manifests(self, path, pkg=None):
7074
"""
@@ -130,7 +134,6 @@ def find_behavior(self, be_identifier):
130134
131135
@return Tuple (be_key, be_entry) corresponding to the name or (None, None) if not found.
132136
"""
133-
134137
if "/" in be_identifier:
135138
# Identifier in the form of package/Name
136139
# where only first slash delineates package

flexbe_core/flexbe_core/core/operatable_state_machine.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,10 +287,12 @@ def _notify_start(self):
287287
state._notify_start()
288288

289289
def _notify_stop(self):
290+
self.on_stop()
290291
for state in self._states:
291292
if isinstance(state, OperatableState):
292293
state.on_stop()
293294
if isinstance(state, OperatableStateMachine):
295+
state.on_stop()
294296
state._notify_stop()
295297
if state._is_controlled:
296298
state._disable_ros_control()

flexbe_core/flexbe_core/core/preemptable_state_machine.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ def _preempt_cb(self, msg):
6767
Logger.localinfo(f'Preempting {self.name}!')
6868
PreemptableState.preempt = True
6969

70+
def on_stop(self):
71+
# No other SM classes have on_stop, so no call to super().on_stop
72+
self._sub.unsubscribe_topic(self._preempt_topic, inst_id=id(self))
73+
7074
@staticmethod
7175
def add(label, state, transitions=None, remapping=None):
7276
transitions[PreemptableState._preempted_name] = PreemptableStateMachine._preempted_name

flexbe_core/flexbe_core/proxy/proxy_action_client.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,9 @@ def setupClient(cls, topic, action_type, wait_duration=10):
115115
Logger.localinfo(f'Existing action client for {topic}'
116116
f' with same action type name, but different instance - re-create client!')
117117

118+
# Destroy the existing client in executor thread
118119
client = ProxyActionClient._clients[topic]
119-
try:
120-
client.destroy()
121-
except Exception as exc:
122-
Logger.localinfo(f'Exception destroying old client for {topic}'
123-
f'{type(exc)} - {exc}')
120+
ProxyActionClient._node.executor.create_task(ProxyActionClient.destroy_client, client, topic)
124121

125122
ProxyActionClient._clients[topic] = ActionClient(ProxyActionClient._node, action_type, topic)
126123
ProxyActionClient._check_topic_available(topic, wait_duration)
@@ -346,3 +343,16 @@ def _check_topic_available(cls, topic, wait_duration=0.1):
346343
@classmethod
347344
def _print_wait_warning(cls, topic):
348345
Logger.logwarn(f"Waiting for action client/server for '{topic}'")
346+
347+
@classmethod
348+
def destroy_client(cls, client, topic):
349+
"""Handle client destruction from within the executor threads."""
350+
try:
351+
if ProxyActionClient._node.destroy_client(client):
352+
Logger.localinfo(f'Destroyed the proxy action client for {topic} ({id(client)})!')
353+
else:
354+
Logger.localwarn(f'Some issue destroying the proxy action client for {topic}!')
355+
del client
356+
except Exception as exc: # pylint: disable=W0703
357+
Logger.error("Something went wrong destroying proxy action client"
358+
f" for {topic}!\n {type(exc)} - {str(exc)}")

flexbe_core/flexbe_core/proxy/proxy_publisher.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ def createPublisher(cls, topic, msg_type, qos=None, **kwargs):
114114
# Same message type name, so likely due to reloading Python module on behavior change
115115
Logger.localinfo(f'Existing publisher for {topic} with same message type name,'
116116
' but different instance - re-create publisher!')
117-
# crashes Humble - ProxyPublisher._node.destroy_publisher(ProxyPublisher._topics[topic])
117+
ProxyPublisher._node.executor.create_task(ProxyPublisher.destroy_publisher,
118+
ProxyPublisher._topics[topic], topic)
118119
qos = qos or QOS_DEFAULT
119120
ProxyPublisher._topics[topic] = ProxyPublisher._node.create_publisher(msg_type, topic, qos)
120121
else:
@@ -243,3 +244,16 @@ def _wait_for_subscribers(cls, pub, timeout=5.0):
243244
rate.wait(polling_rate) # Use system time for polling, not ROS possibly sim_time
244245
del rate
245246
return False
247+
248+
@classmethod
249+
def destroy_publisher(cls, pub, topic):
250+
"""Handle publisher destruction from within the executor threads."""
251+
try:
252+
if ProxyPublisher._node.destroy_publisher(pub):
253+
Logger.localinfo(f'Destroyed the proxy publisher for {topic} ({id(pub)})!')
254+
else:
255+
Logger.localwarn(f'Some issue destroying the proxy publisher for {topic}!')
256+
del pub
257+
except Exception as exc: # pylint: disable=W0703
258+
Logger.error("Something went wrong destroying proxy publisher"
259+
f" for {topic}!\n {type(exc)} - {str(exc)}")

flexbe_core/flexbe_core/proxy/proxy_service_caller.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,8 @@ def setupService(cls, topic, srv_type, wait_duration=10):
115115
if srv_type.__name__ == ProxyServiceCaller._services[topic].srv_type.__name__:
116116
Logger.localinfo(f'Existing service for {topic} with same message type name,'
117117
f' but different instance - re-create service!')
118-
try:
119-
ProxyServiceCaller._node.destroy_client(ProxyServiceCaller._services[topic])
120-
except Exception as exc: # pylint: disable=W0703
121-
Logger.error("Something went wrong destroying service client"
122-
f" for {topic}!\n {type(exc)} - {exc}")
118+
ProxyServiceCaller._node.executor.create_task(ProxyServiceCaller.destroy_service,
119+
ProxyServiceCaller._services[topic], topic)
123120

124121
ProxyServiceCaller._services[topic] = ProxyServiceCaller._node.create_client(srv_type, topic)
125122
if isinstance(wait_duration, float):
@@ -294,3 +291,16 @@ def _check_service_available(cls, topic, wait_duration=1):
294291
@classmethod
295292
def _print_wait_warning(cls, topic):
296293
Logger.warning("Waiting for service %s..." % (topic))
294+
295+
@classmethod
296+
def destroy_service(cls, srv, topic):
297+
"""Handle service client destruction from within the executor threads."""
298+
try:
299+
if ProxyServiceCaller._node.destroy_client(srv):
300+
Logger.localinfo(f'Destroyed the proxy service caller for {topic} ({id(srv)})!')
301+
else:
302+
Logger.localwarn(f'Some issue destroying the proxy service caller for {topic}!')
303+
del srv
304+
except Exception as exc: # pylint: disable=W0703
305+
Logger.error("Something went wrong destroying service caller"
306+
f" for {topic}!\n {type(exc)} - {str(exc)}")

flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -338,23 +338,38 @@ def unsubscribe_topic(cls, topic, inst_id=-1):
338338
if topic in ProxySubscriberCached._topics:
339339

340340
try:
341-
if inst_id in ProxySubscriberCached._topics[topic]['subscribers']:
342-
ProxySubscriberCached._topics[topic]['subscribers'].remove(inst_id)
341+
topic_dict = ProxySubscriberCached._topics[topic]
342+
if inst_id in topic_dict['subscribers']:
343+
topic_dict['subscribers'].remove(inst_id)
343344
Logger.localinfo(f"Unsubscribed {topic} from proxy! "
344-
f"({len(ProxySubscriberCached._topics[topic]['subscribers'])} remaining)")
345+
f"({len(topic_dict['subscribers'])} remaining)")
345346

346-
if inst_id in ProxySubscriberCached._topics[topic]['callbacks']:
347-
ProxySubscriberCached._topics[topic]['callbacks'].pop(inst_id)
347+
if inst_id in topic_dict['callbacks']:
348+
topic_dict['callbacks'].pop(inst_id)
348349
Logger.localinfo(f"Removed callback from proxy subscription for {topic} "
349-
f"from proxy! ({len(ProxySubscriberCached._topics[topic]['callbacks'])} remaining)")
350-
351-
if len(ProxySubscriberCached._topics[topic]['subscribers']) == 0:
352-
Logger.localinfo(f"Proxy subscriber has no remaining customers for {topic}\n"
353-
f" Destroying subscription at node causes crash in Humble "
354-
f"for both Cyclone and Fast DDS,\n"
355-
f" and recreating subscription later causes multiple callbacks per "
356-
f"published message,\n so just leave existing subscription in place for now!")
357-
# This has potential to leave topics from old behaviors active
358-
ProxySubscriberCached.disable_buffer(topic) # Stop buffering until someone asks for it again
350+
f"from proxy! ({len(topic_dict['callbacks'])} remaining)")
351+
352+
remaining_subscribers = len(topic_dict['subscribers'])
353+
remaining_callbacks = len(topic_dict['callbacks'])
354+
if remaining_subscribers == 0:
355+
assert remaining_callbacks == 0, "Must have at least one subscriber tracked for every callback!"
356+
sub = topic_dict['subscription']
357+
ProxySubscriberCached._topics.pop(topic)
358+
ProxySubscriberCached._node.executor.create_task(ProxySubscriberCached.destroy_subscription,
359+
sub, topic)
360+
359361
except Exception as exc: # pylint: disable=W0703
360362
Logger.error(f'Something went wrong unsubscribing {topic} of proxy subscriber!\n%s', str(exc))
363+
364+
@classmethod
365+
def destroy_subscription(cls, sub, topic):
366+
"""Handle subscription destruction from within the executor threads."""
367+
try:
368+
if ProxySubscriberCached._node.destroy_subscription(sub):
369+
Logger.localinfo(f'Destroyed the proxy subscription for {topic} ({id(sub)})!')
370+
else:
371+
Logger.localwarn(f'Some issue destroying the proxy subscription for {topic}!')
372+
del sub
373+
except Exception as exc: # pylint: disable=W0703
374+
Logger.error("Something went wrong destroying subscription"
375+
f" for {topic}!\n {type(exc)} - {str(exc)}")

flexbe_input/flexbe_input/input_gui.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class InputGUI(QMainWindow):
3636
3737
Instances of this class should be created in input_action_server.py.
3838
"""
39+
3940
def __init__(self, prompt):
4041
QMainWindow.__init__(self)
4142

flexbe_mirror/flexbe_mirror/behavior_mirror_sm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def main(args=None):
4343
mirror = FlexbeMirror()
4444

4545
# Use at least 2 threads to stay on top of pub/sub requirements
46-
executor = rclpy.executors.MultiThreadedExecutor(num_threads=2)
46+
executor = rclpy.executors.SingleThreadedExecutor()
4747
executor.add_node(mirror)
4848

4949
print("Begin behavior mirror processing ...", flush=True)

0 commit comments

Comments
 (0)