Skip to content

Commit 5694566

Browse files
committed
cras_bag_tools: bag_filter: Added possibility to implement message generators alongside message filters.
1 parent aee8df7 commit 5694566

File tree

2 files changed

+70
-8
lines changed

2 files changed

+70
-8
lines changed

cras_bag_tools/src/cras_bag_tools/bag_filter.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55

66
import copy
77
import sys
8-
from collections import deque
98

109
import genpy
1110
import rosbag.bag
11+
import rospy
12+
from cras import Heap
1213

1314
from .bag_utils import BagWrapper, MultiBag
1415
from .message_filter import MessageFilter, Passthrough, filter_message
@@ -66,14 +67,18 @@ def filter_bag(bags, out, bag_filter=Passthrough(), params=None, start_time=None
6667
# apply connection filters
6768
topics = [c.topic for c in bags._get_connections(topics, bag_filter.connection_filter)] # noqa
6869

69-
queue = deque()
70+
get_stamp_fn = lambda x: x[2] # get stamp from the tuple
71+
heap = Heap(initial=list(bag_filter.extra_initial_messages()), key=get_stamp_fn)
72+
7073
connection_filter = bag_filter.connection_filter
74+
_stamp = rospy.Time(0)
7175
for topic, msg, stamp, connection_header in bags.read_messages(
7276
topics=topics, start_time=time_ranges, end_time=extra_time_ranges, return_connection_header=True,
7377
raw=bag_filter.is_raw, connection_filter=connection_filter):
74-
queue.append((topic, msg, stamp, connection_header))
75-
while len(queue) > 0:
76-
_topic, _msg, _stamp, _connection_header = queue.popleft()
78+
heap.push((topic, msg, stamp, connection_header))
79+
# For each bag message, process the whole heap up to the stamp of the bag message
80+
while len(heap) > 0 and _stamp <= stamp:
81+
_topic, _msg, _stamp, _connection_header = heap.pop()
7782
_connection_header = copy.copy(_connection_header) # Prevent modifying connection records from in bag
7883
is_from_extra_time_ranges = False
7984
if extra_time_ranges is not None and time_ranges is not None:
@@ -82,10 +87,31 @@ def filter_bag(bags, out, bag_filter=Passthrough(), params=None, start_time=None
8287
if not isinstance(ret, list):
8388
ret = [ret]
8489
for data in ret[1:]:
85-
queue.append(data)
90+
heap.push(data)
8691
if ret[0] is None:
8792
continue
8893
_topic, _raw_msg, _stamp, _connection_header = ret[0]
8994
out.write(_topic, _raw_msg, _stamp, connection_header=_connection_header, raw=True)
9095

96+
# Push all final messages before processing the rest of the heap
97+
for m in bag_filter.extra_final_messages():
98+
heap.push(m)
99+
100+
# Finish the rest of the heap if there is something left (messages with stamp higher than last message from bag)
101+
while len(heap) > 0:
102+
_topic, _msg, _stamp, _connection_header = heap.pop()
103+
_connection_header = copy.copy(_connection_header) # Prevent modifying connection records from in bag
104+
is_from_extra_time_ranges = False
105+
if extra_time_ranges is not None and time_ranges is not None:
106+
is_from_extra_time_ranges = _stamp in extra_time_ranges and _stamp not in time_ranges
107+
ret = filter_message(_topic, _msg, _stamp, _connection_header, bag_filter, True, is_from_extra_time_ranges)
108+
if not isinstance(ret, list):
109+
ret = [ret]
110+
for data in ret[1:]:
111+
heap.push(data)
112+
if ret[0] is None:
113+
continue
114+
_topic, _raw_msg, _stamp, _connection_header = ret[0]
115+
out.write(_topic, _raw_msg, _stamp, connection_header=_connection_header, raw=True)
116+
91117
bag_filter.reset()

cras_bag_tools/src/cras_bag_tools/message_filter.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
from __future__ import absolute_import, division, print_function
77

88
import sys
9-
from typing import List, Optional, Tuple, Union
9+
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
1010

1111
import genpy
1212
import rospy
1313
from cras.message_utils import raw_to_msg, msg_to_raw
1414
from cras.plugin_utils import get_plugin_implementations
15-
from cras.string_utils import to_str
15+
from cras.string_utils import to_str, STRING_TYPE
1616

1717
from .bag_utils import MultiBag
1818
from .time_range import TimeRange, TimeRanges
@@ -239,6 +239,32 @@ def topic_filter(self, topic):
239239
"""
240240
return True
241241

242+
def extra_initial_messages(self):
243+
# type: () -> Iterable[Tuple[STRING_TYPE, Any, rospy.Time, Optional[Dict[STRING_TYPE, STRING_TYPE]]]]
244+
"""Get extra messages that should be passed to the filter before the iteration over bag messages starts.
245+
246+
This can be used e.g. by filters that are more generators the actual filters (i.e. they do not operate on
247+
existing messages, but instead create new ones based on other information).
248+
249+
:note: :py:meth:`set_bag` should be called before calling this method.
250+
:note: Do not generate very large data. All initial messages will be stored in RAM at once.
251+
:return: A list or iterator of the message 4-tuples `(topic, message, stamp, connection_header)`.
252+
"""
253+
return []
254+
255+
def extra_final_messages(self):
256+
# type: () -> Iterable[Tuple[STRING_TYPE, Any, rospy.Time, Optional[Dict[STRING_TYPE, STRING_TYPE]]]]
257+
"""Get extra messages that should be passed to the filter after the iteration over bag messages stops.
258+
259+
This can be used e.g. by filters that are more generators the actual filters (i.e. they do not operate on
260+
existing messages, but instead create new ones based on other information).
261+
262+
:note: :py:meth:`set_bag` should be called before calling this method.
263+
:note: Do not generate very large data. All final messages will be stored in RAM at once.
264+
:return: A list or iterator of the message 4-tuples `(topic, message, stamp, connection_header)`.
265+
"""
266+
return []
267+
242268
def extra_time_ranges(self, bags):
243269
"""If this filter requires that certain time ranges are read from the bagfiles in any case (like the static
244270
TFs at the beginning), it should return the requested time range here. Such parts of the bag will be always
@@ -550,6 +576,16 @@ def extra_time_ranges(self, bags):
550576
time_ranges.append(extra_ranges.ranges)
551577
return time_ranges
552578

579+
def extra_initial_messages(self):
580+
for f in self.filters:
581+
for m in f.extra_initial_messages():
582+
yield m
583+
584+
def extra_final_messages(self):
585+
for f in self.filters:
586+
for m in f.extra_final_messages():
587+
yield m
588+
553589
def reset(self):
554590
for f in self.filters:
555591
f.reset()

0 commit comments

Comments
 (0)