Skip to content

Commit 3e25416

Browse files
author
David Conner
committed
streamline pub/sub for mirror; cleanup on behavior shutdown
1 parent 0f8c427 commit 3e25416

File tree

4 files changed

+46
-23
lines changed

4 files changed

+46
-23
lines changed

flexbe_mirror/flexbe_mirror/behavior_mirror_sm.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def main(args=None):
4242

4343
mirror = FlexbeMirror()
4444

45-
# Use at least 2 threads to stay on top of pub/sub requirements
4645
executor = rclpy.executors.SingleThreadedExecutor()
4746
executor.add_node(mirror)
4847

flexbe_mirror/flexbe_mirror/flexbe_mirror.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@ def __init__(self):
6464
PreemptableStateMachine.initialize_ros(self)
6565
LockableStateMachine.initialize_ros(self)
6666

67-
# set up proxys for sm <--> GUI communication
68-
# publish topics
69-
self._pub = ProxyPublisher({'flexbe/behavior_update': String,
70-
'flexbe/request_mirror_structure': Int32})
71-
self._heartbeat_pub = self.create_publisher(Int32, 'flexbe/mirror/heartbeat', 2)
72-
7367
self._timing_event = threading.Event() # Used for wait timer
7468
self._running = False
7569
self._stopping = False
@@ -80,20 +74,30 @@ def __init__(self):
8074
self._sync_lock = threading.Lock()
8175
self._state_checksums = {}
8276

77+
# set up pub/sub sm <--> GUI communication of mirror
78+
# publish topics
79+
self._heartbeat_pub = self.create_publisher(Int32, 'flexbe/mirror/heartbeat', 2)
80+
self._request_struct_pub = self.create_publisher(Int32, 'flexbe/request_mirror_structure', 2)
81+
82+
# listen for mirror control messages using standard subscriptions
83+
self._status_sub = self.create_subscription(BEStatus, 'flexbe/status', self._status_callback, 10)
84+
self._struct_sub = self.create_subscription(ContainerStructure, 'flexbe/mirror/structure',
85+
self._mirror_structure_callback, 10)
86+
self._sync_sub = self.create_subscription(BehaviorSync, 'flexbe/mirror/sync', self._sync_callback, 10)
87+
self._preempt_sub = self.create_subscription(Empty, 'flexbe/mirror/preempt', self._preempt_callback, 10)
88+
self._onboard_heartbeat_sub = self.create_subscription(BehaviorSync, 'flexbe/heartbeat', self._heartbeat_callback, 10)
89+
self._sync_heartbeat_mismatch_counter = 0
90+
8391
self._outcome_topic = 'flexbe/mirror/outcome'
92+
self._update_topic = 'flexbe/behavior_update'
93+
# Use proxy publisher/subscriber for access in states
94+
# but just initialize here once for all
95+
self._beh_update_pub = ProxyPublisher({self._update_topic: String})
8496

85-
# listen for mirror message
8697
self._sub = ProxySubscriberCached()
8798
self._sub.subscribe(self._outcome_topic, UInt8, inst_id=id(self))
8899
self._sub.enable_buffer(self._outcome_topic)
89100

90-
self._sub.subscribe('flexbe/status', BEStatus, self._status_callback, inst_id=id(self))
91-
self._sub.subscribe('flexbe/mirror/structure', ContainerStructure, self._mirror_structure_callback, inst_id=id(self))
92-
self._sub.subscribe('flexbe/mirror/sync', BehaviorSync, self._sync_callback, inst_id=id(self))
93-
self._sub.subscribe('flexbe/mirror/preempt', Empty, self._preempt_callback, inst_id=id(self))
94-
self._sub.subscribe('flexbe/heartbeat', BehaviorSync, self._heartbeat_callback, inst_id=id(self))
95-
self._sync_heartbeat_mismatch_counter = 0
96-
97101
# no clean way to wait for publisher to be ready...
98102
Logger.loginfo('--> Mirror - setting up publishers and subscribers ...')
99103
self._timing_event.wait(1.0) # Give publishers time to initialize
@@ -184,7 +188,7 @@ def _activate_mirror(self, struct_msg):
184188
else:
185189
Logger.localwarn(f'Error processing mirror structure for behavior checksum id = {struct_msg.behavior_id}')
186190
Logger.logwarn('Requesting a new mirror structure from onboard ...')
187-
self._pub.publish('flexbe/request_mirror_structure', Int32(data=struct_msg.behavior_id))
191+
self._request_struct_pub.publish(Int32(data=struct_msg.behavior_id))
188192
self._active_id = struct_msg.behavior_id
189193
return
190194

@@ -249,7 +253,7 @@ def _start_mirror(self, msg):
249253
if self._sm is None:
250254
Logger.localwarn(f'Missing correct mirror structure for starting behavior checksum id ={msg.behavior_id}')
251255
Logger.logwarn('Requesting mirror structure from onboard ...')
252-
self._pub.publish('flexbe/request_mirror_structure', Int32(data=msg.behavior_id))
256+
self._request_struct_pub.publish(Int32(data=msg.behavior_id))
253257
self._active_id = msg.behavior_id
254258
return
255259

@@ -271,19 +275,21 @@ def _stop_mirror(self, msg):
271275
if self._sm is not None and self._running:
272276
if msg is not None and msg.code == BEStatus.FINISHED:
273277
Logger.loginfo('Onboard behavior finished successfully.')
274-
self._pub.publish('flexbe/behavior_update', String())
278+
self._beh_update_pub.publish(self._update_topic, String())
275279
elif msg is not None and msg.code == BEStatus.SWITCHING:
276280
self._starting_path = None
277281
Logger.loginfo('Onboard performing behavior switch.')
278282
elif msg is not None and msg.code == BEStatus.READY:
279283
Logger.loginfo('Onboard engine just started, stopping currently running mirror.')
280-
self._pub.publish('flexbe/behavior_update', String())
284+
self._beh_update_pub.publish(self._update_topic, String())
281285
elif msg is not None:
282286
Logger.logwarn('Onboard behavior failed!')
283-
self._pub.publish('flexbe/behavior_update', String())
287+
self._beh_update_pub.publish(self._update_topic, String())
284288

285289
self._wait_stop_running()
286290

291+
self._sm.destroy()
292+
287293
else:
288294
Logger.localinfo('No onboard behavior is active.')
289295

@@ -398,7 +404,7 @@ def _restart_mirror(self, msg):
398404
else:
399405
Logger.localwarn(f'Missing correct mirror structure for restarting behavior checksum id ={msg.behavior_id}')
400406
Logger.logwarn('Requesting mirror structure from onboard ...')
401-
self._pub.publish('flexbe/request_mirror_structure', Int32(data=msg.behavior_id))
407+
self._request_struct_pub.publish(Int32(data=msg.behavior_id))
402408
self._active_id = msg.behavior_id
403409
return
404410

@@ -426,6 +432,7 @@ def _execute_mirror(self):
426432
try:
427433
result = self._sm.spin()
428434
Logger.loginfo(f"Mirror for active id = {self._active_id} finished with result '{result}'")
435+
self._sm.destroy()
429436
except Exception as exc:
430437
try:
431438
Logger.logerr('\n(_execute_mirror Traceback): Caught exception on preempt:\n%s' % str(exc))

flexbe_mirror/flexbe_mirror/mirror_state.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
"""Simplified state for use with FlexBE UI State machine mirror."""
3131

32-
from std_msgs.msg import String, UInt8
32+
from std_msgs.msg import String
3333

3434
from flexbe_core import EventState
3535
from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
@@ -52,8 +52,9 @@ def __init__(self, target_name, target_path, given_outcomes, outcome_autonomy):
5252

5353
self._outcome_topic = 'flexbe/mirror/outcome'
5454

55+
# Allow access to standard proxies
5556
self._pub = ProxyPublisher()
56-
self._sub = ProxySubscriberCached({self._outcome_topic: UInt8}, inst_id=id(self))
57+
self._sub = ProxySubscriberCached()
5758

5859
def execute(self, userdata):
5960
if self._sub.has_buffered(self._outcome_topic):
@@ -65,3 +66,6 @@ def execute(self, userdata):
6566
def on_enter(self, userdata):
6667
self._pub.publish('flexbe/behavior_update',
6768
String(data="/" + "/".join(self._target_path.split("/")[1:])))
69+
70+
def on_stop(self):
71+
"""Call when mirror SM stops."""

flexbe_mirror/flexbe_mirror/mirror_state_machine.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from flexbe_core import Logger
3838
from flexbe_core.core import PreemptableStateMachine
3939
from flexbe_msgs.msg import BehaviorSync
40+
from flexbe_mirror.mirror_state import MirrorState
4041

4142

4243
class MirrorStateMachine(PreemptableStateMachine):
@@ -74,6 +75,18 @@ def spin(self, userdata=None):
7475

7576
return outcome
7677

78+
def destroy(self):
79+
Logger.localinfo(f'Destroy mirror state machine {self.name} ...')
80+
self._notify_stop()
81+
82+
def _notify_stop(self):
83+
self.on_stop()
84+
for state in self._states:
85+
if isinstance(state, MirrorState):
86+
state.on_stop()
87+
if isinstance(state, MirrorStateMachine):
88+
state._notify_stop()
89+
7790
def get_latest_status(self):
7891
"""
7992
Return the latest execution information as a BehaviorSync message.

0 commit comments

Comments
 (0)