Skip to content

Commit 8095940

Browse files
author
David Conner
committed
update to protect against executor calls after unsubscribe/shutdown
1 parent 93253f0 commit 8095940

File tree

5 files changed

+47
-37
lines changed

5 files changed

+47
-37
lines changed

flexbe_core/flexbe_core/proxy/proxy_action_client.py

-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ def shutdown():
7272
ProxyActionClient._cancel_current_goal.clear()
7373
ProxyActionClient._has_active_goal.clear()
7474
ProxyActionClient._current_goal.clear()
75-
ProxyActionClient._node = None
7675
except Exception as exc: # pylint: disable=W0703
7776
print(f'Something went wrong during shutdown of proxy action clients!\n{ str(exc)}')
7877

flexbe_core/flexbe_core/proxy/proxy_publisher.py

-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ def shutdown():
6666

6767
print("Shutdown proxy publisher ...")
6868
ProxyPublisher._topics.clear()
69-
ProxyPublisher._node = None
7069

7170
except Exception as exc: # pylint: disable=W0703
7271
Logger.error(f'Something went wrong during shutdown of proxy publisher !\n{str(exc)}')

flexbe_core/flexbe_core/proxy/proxy_service_caller.py

-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ def shutdown():
7171

7272
print("Shutdown proxy service caller ...")
7373
ProxyServiceCaller._results.clear()
74-
ProxyServiceCaller._node = None
7574

7675
except Exception as exc: # pylint: disable=W0703
7776
print(f'Something went wrong during shutdown of proxy service caller !\n{str(exc)}')

flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py

+45-33
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ def shutdown():
7070
print("Shutdown proxy subscriber ...")
7171
ProxySubscriberCached._topics.clear()
7272
ProxySubscriberCached._persistant_topics.clear()
73-
ProxySubscriberCached._node = None
7473

7574
except Exception as exc: # pylint: disable=W0703
7675
print(f'Something went wrong during shutdown of proxy subscriber !\n{str(exc)}')
@@ -156,7 +155,8 @@ def subscribe(cls, topic, msg_type, callback=None, buffered=False, qos=None, ins
156155
f"({len(ProxySubscriberCached._topics[topic]['subscribers'])})")
157156

158157
# Register the local callback for topic message
159-
cls.set_callback(topic, callback, inst_id)
158+
if callback is not None:
159+
cls.set_callback(topic, callback, inst_id)
160160

161161
@classmethod
162162
def _callback(cls, msg, topic):
@@ -173,16 +173,18 @@ def _callback(cls, msg, topic):
173173
Logger.localinfo(f"-- invalid topic={topic} for callback!")
174174
return
175175

176-
ProxySubscriberCached._topics[topic]['last_msg'] = msg
177-
if ProxySubscriberCached._topics[topic]['buffered']:
178-
ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
179-
180-
for inst_id, callback in ProxySubscriberCached._topics[topic]['callbacks'].items():
181-
try:
182-
callback(msg)
183-
except Exception as exc: # pylint: disable=W0703
184-
Logger.error(f"Exception in callback for {topic}: "
185-
f"{callback.__module__} {callback.__name__} @ {inst_id} \n {exc} ")
176+
try:
177+
ProxySubscriberCached._topics[topic]['last_msg'] = msg
178+
if ProxySubscriberCached._topics[topic]['buffered']:
179+
ProxySubscriberCached._topics[topic]['msg_queue'].append(msg)
180+
for inst_id, callback in ProxySubscriberCached._topics[topic]['callbacks'].items():
181+
try:
182+
callback(msg)
183+
except Exception as exc: # pylint: disable=W0703
184+
Logger.error(f"Exception in callback for {topic}: "
185+
f"{callback.__module__} {callback.__name__} @ {inst_id} \n {exc} ")
186+
except KeyError:
187+
Logger.error(f"Error: {topic} is no longer available for processing callback! ")
186188

187189
@classmethod
188190
def set_callback(cls, topic, callback, inst_id=-1):
@@ -203,16 +205,24 @@ def set_callback(cls, topic, callback, inst_id=-1):
203205
return
204206

205207
if callback is not None:
208+
ProxySubscriberCached._node.executor.create_task(cls.__set_callback, topic, callback, inst_id)
209+
210+
@classmethod
211+
def __set_callback(cls, topic, callback, inst_id):
212+
"""Set callback in executor thread."""
213+
try:
206214
if inst_id not in ProxySubscriberCached._topics[topic]['callbacks']:
207215
ProxySubscriberCached._topics[topic]['callbacks'][inst_id] = callback
208216
Logger.localinfo(f" Set local callback {callback.__name__} of "
209-
f"{len(ProxySubscriberCached._topics[topic]['callbacks'])} for {topic}!")
217+
f"{len(ProxySubscriberCached._topics[topic]['callbacks'])} for {topic}!")
210218
else:
211219
Logger.localinfo("Update existing callback "
212-
f"{ProxySubscriberCached._topics[topic]['callbacks'][inst_id].__name__} with "
213-
f"{callback.__name__} of {len(ProxySubscriberCached._topics[topic]['callbacks'])}"
214-
f" for {topic}!")
220+
f"{ProxySubscriberCached._topics[topic]['callbacks'][inst_id].__name__} with "
221+
f"{callback.__name__} of {len(ProxySubscriberCached._topics[topic]['callbacks'])}"
222+
f" for {topic}!")
215223
ProxySubscriberCached._topics[topic]['callbacks'][inst_id] = callback
224+
except KeyError:
225+
Logger.localwarn("Error: topic {topic} is not longer available - cannot set callback!")
216226

217227
@classmethod
218228
def enable_buffer(cls, topic):
@@ -307,9 +317,13 @@ def remove_last_msg(cls, topic, clear_buffer=False):
307317
"""
308318
if topic in ProxySubscriberCached._persistant_topics:
309319
return
310-
ProxySubscriberCached._topics[topic]['last_msg'] = None
311-
if clear_buffer:
312-
ProxySubscriberCached._topics[topic]['msg_queue'] = []
320+
321+
try:
322+
ProxySubscriberCached._topics[topic]['last_msg'] = None
323+
if clear_buffer:
324+
ProxySubscriberCached._topics[topic]['msg_queue'] = []
325+
except KeyError:
326+
Logger.localwarn(f"remove_last_msg: {topic} is not available!")
313327

314328
@classmethod
315329
def make_persistant(cls, topic):
@@ -343,20 +357,18 @@ def unsubscribe_topic(cls, topic, inst_id=-1):
343357
topic_dict['subscribers'].remove(inst_id)
344358
Logger.localinfo(f"Unsubscribed {topic} from proxy! "
345359
f"({len(topic_dict['subscribers'])} remaining)")
346-
347-
if inst_id in topic_dict['callbacks']:
348-
topic_dict['callbacks'].pop(inst_id)
349-
Logger.localinfo(f"Removed callback from proxy subscription for {topic} "
350-
f"from proxy! ({len(topic_dict['callbacks'])} remaining)")
351-
352-
remaining_subscribers = len(topic_dict['subscribers'])
353-
remaining_callbacks = len(topic_dict['callbacks'])
354-
if remaining_subscribers == 0:
355-
assert remaining_callbacks == 0, "Must have at least one subscriber tracked for every callback!"
356-
sub = topic_dict['subscription']
357-
ProxySubscriberCached._topics.pop(topic)
358-
ProxySubscriberCached._node.executor.create_task(ProxySubscriberCached.destroy_subscription,
359-
sub, topic)
360+
361+
remaining_subscribers = len(topic_dict['subscribers'])
362+
if remaining_subscribers == 0:
363+
ProxySubscriberCached._topics.pop(topic) # remove from list of valid topics
364+
sub = topic_dict['subscription']
365+
ProxySubscriberCached._node.executor.create_task(ProxySubscriberCached.destroy_subscription,
366+
sub, topic)
367+
elif inst_id in topic_dict['callbacks']:
368+
# Remove callback in executor thread to avoid changing size during callback
369+
ProxySubscriberCached._node.executor.create_task(topic_dict['callbacks'].pop, inst_id)
370+
Logger.localinfo(f"Removed callback from proxy subscription for {topic} "
371+
f"from proxy! ({len(topic_dict['callbacks'])} remaining)")
360372

361373
except Exception as exc: # pylint: disable=W0703
362374
Logger.error(f'Something went wrong unsubscribing {topic} of proxy subscriber!\n%s', str(exc))

flexbe_mirror/flexbe_mirror/flexbe_mirror.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ def _start_mirror(self, msg):
268268

269269
def _stop_mirror(self, msg):
270270
self.get_logger().info('--> Mirror - request to stop mirror for '
271-
f'checksum id={msg.behavior_id} - waiting for sync lock ...')
271+
f'checksum id={msg.behavior_id if isinstance(msg, BEStatus) else None} '
272+
f'- waiting for sync lock ...')
272273
with self._sync_lock:
273274
# self.get_logger().info(f'--> Mirror - stopping mirror for checksum id={msg.behavior_id} with sync lock ...')
274275
self._stopping = True

0 commit comments

Comments
 (0)