Skip to content

Commit e114148

Browse files
committed
cras_bag_tools: Support filters that need to read extra time ranges.
1 parent cce91fa commit e114148

File tree

8 files changed

+248
-58
lines changed

8 files changed

+248
-58
lines changed

cras_bag_tools/README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ Process bag files with a configured set of filters.
8686
#### Usage
8787

8888
usage: filter_bag [-h] [-c CONFIG [CONFIG ...]] [-o OUT_FORMAT] [--lz4] [--bz2] [--no-copy-params] [--list-yaml-keys] [--list-filters] [--merge-initial-static-tf [DURATION]]
89+
[--start-time START_TIME] [--end-time END_TIME] [--time-ranges START_TIME END_TIME [START_TIME END_TIME ...]] [-l]
8990
[--throttle TOPIC RATE [TOPIC RATE ...]] [-i INCLUDE_TOPICS [INCLUDE_TOPICS ...]] [-e EXCLUDE_TOPICS [EXCLUDE_TOPICS ...]] [--include-types INCLUDE_TYPES [INCLUDE_TYPES ...]]
9091
[--exclude-types EXCLUDE_TYPES [EXCLUDE_TYPES ...]] [--include-tf-parents INCLUDE_TF_PARENTS [INCLUDE_TF_PARENTS ...]]
9192
[--exclude-tf-parents EXCLUDE_TF_PARENTS [EXCLUDE_TF_PARENTS ...]] [--include-tf-children INCLUDE_TF_CHILDREN [INCLUDE_TF_CHILDREN ...]]
@@ -95,12 +96,20 @@ Process bag files with a configured set of filters.
9596

9697
Positional arguments:
9798

98-
* `bags`: The list of bags to process.
99+
* `bags`: The list of bags to process. Each bag can be a colon-delimited list of bags (*multibag*), in which case all of
100+
these bags will be read at the same time. One of the items can also be a path to a `.yaml` or `.params` file from
101+
which the ROS parameters will be read. If no params file is given, each bag is tried with `.yaml` and `.params`
102+
extensions appended. The first YAML file found in this way will be loaded as the parameters file. The first bag file
103+
in each *multibag* has a special meaning which can be triggered by `--limit-to-first-bag`.
99104

100105
Optional arguments:
101106
* `--list-yaml-keys`: Print a list of all available YAML top-level keys provided by filters.
102107
* `--list-filters`: Print a list of all available filters.
103108
* `--no-copy-params`: If set, no .params file will be copied
109+
* `--start-time START_TIME`: Time from which the bag filtering should be started.
110+
* `--end-time END_TIME`: Time to which the bag filtering should be stopped.
111+
* `--time-ranges START_TIME END_TIME [START_TIME END_TIME ...]`: Time ranges of bags that should be processed.
112+
* `-l`, `--limit-to-first-bag`: Read duration only from the first bag of each multibag.
104113
* `-i INCLUDE_TOPICS [INCLUDE_TOPICS ...]`, `--include-topics INCLUDE_TOPICS [INCLUDE_TOPICS ...]`: Retain only
105114
these topics
106115
* `-e EXCLUDE_TOPICS [EXCLUDE_TOPICS ...]`, `--exclude-topics EXCLUDE_TOPICS [EXCLUDE_TOPICS ...]`: Remove these topics

cras_bag_tools/scripts/filter_bag

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ from argparse import ArgumentParser
1717
from glob import glob
1818
from shutil import copyfile
1919

20+
import genpy
2021
import rosbag
2122
from cras import pretty_file_size
2223
from cras_bag_tools.bag_filter import filter_bag
@@ -59,7 +60,7 @@ def copy_params_if_any(param_file, out_bag_path):
5960

6061

6162
def filter_bags(bags, out_format, compression, copy_params, filter, default_params_file=None,
62-
start_time=None, end_time=None, time_ranges=None):
63+
start_time=None, end_time=None, time_ranges=None, limit_to_first_bag=False):
6364
"""Filter all given bags using the given filter.
6465
6566
:param list bags: The bags to filter. If multiple bags should be read at once, add them as a single list item with
@@ -75,6 +76,9 @@ def filter_bags(bags, out_format, compression, copy_params, filter, default_para
7576
:param TimeRanges time_ranges: Time ranges of the bag files to process. If start_time and end_time are specified,
7677
they are merged with these ranges. Relative time ranges will be evaluated relative
7778
to each individual bag.
79+
:param bool limit_to_first_bag: If True, each multibag will report its start and end to be equal to the
80+
first open bag. If False, the start and end correspond to the earliest and latest
81+
stamp in all bags.
7882
"""
7983
i = 0
8084
for bags_path in bags:
@@ -115,7 +119,7 @@ def filter_bags(bags, out_format, compression, copy_params, filter, default_para
115119
continue
116120

117121
try:
118-
with TqdmMultiBag(bags_path, skip_index=True) as bag:
122+
with TqdmMultiBag(bags_path, skip_index=True, limit_to_first_bag=limit_to_first_bag) as bag:
119123
print('- Size: %s' % (pretty_file_size(bag.size),))
120124

121125
out_bag_path = out_path(bag_path, out_format)
@@ -157,7 +161,7 @@ class TimeRangesAction(argparse.Action):
157161

158162
def main():
159163
parser = ArgumentParser()
160-
parser.add_argument('bags', nargs='*', help="The bag files to filter.")
164+
parser.add_argument('bags', nargs='*', help="The (multi)bag files to filter.")
161165
parser.add_argument('-c', '--config', nargs='+', help="YAML configs of filters")
162166
parser.add_argument('-o', '--out-format', default=argparse.SUPPRESS,
163167
help='Template for naming the output bag. Defaults to "{name}.proc{ext}"')
@@ -181,6 +185,8 @@ def main():
181185
help="Time to which the bag filtering should be stopped.")
182186
parser.add_argument("--time-ranges", dest="time_ranges", nargs='+', action=TimeRangesAction, default=None,
183187
help="Time ranges of bags that should be processed.", metavar='START_TIME END_TIME')
188+
parser.add_argument("-l", "--limit-to-first-bag", dest="limit_to_first_bag", action="store_true",
189+
help="Read duration only from the first bag of each multibag.")
184190

185191
loaded_filters = get_filters()
186192
unique_filters = set(loaded_filters.values())
@@ -272,7 +278,9 @@ def main():
272278
args.copy_params = True
273279

274280
filter_bags(args.bags, args.out_format, args.compression, args.copy_params, filter_chain, args.default_params_file,
275-
args.start_time, args.end_time, args.time_ranges)
281+
genpy.Time(args.start_time) if args.start_time is not None else None,
282+
genpy.Time(args.end_time) if args.end_time is not None else None, args.time_ranges,
283+
args.limit_to_first_bag)
276284

277285

278286
if __name__ == '__main__':

cras_bag_tools/src/cras_bag_tools/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@
55
from .topic_set import TopicSet
66
from .bag_utils import *
77
from .tqdm_bag import *
8-
from .message_filter import MessageFilter, DeserializedMessageFilter, RawMessageFilter, FilterChain, Passthrough, \
9-
filter_message, get_filters
8+
from .message_filter import *
109
from .bag_filter import filter_bag

cras_bag_tools/src/cras_bag_tools/bag_filter.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ def filter_bag(bags, out, bag_filter=Passthrough(), params=None, start_time=None
5656
if time_ranges is not None:
5757
time_ranges.set_base_time(genpy.Time(bags.get_start_time()))
5858

59+
extra_time_ranges = None
60+
# If time_ranges is None, we read the whole bags, so we don't need any extra time ranges
61+
if time_ranges is not None:
62+
extra_time_ranges = bag_filter.extra_time_ranges(bags)
63+
if extra_time_ranges is not None:
64+
extra_time_ranges.set_base_time(genpy.Time(bags.get_start_time()))
65+
5966
# get all topics
6067
topics = [c.topic for c in bags._get_connections()] # noqa
6168
# apply topic filters
@@ -66,13 +73,16 @@ def filter_bag(bags, out, bag_filter=Passthrough(), params=None, start_time=None
6673
queue = Queue()
6774
connection_filter = bag_filter.connection_filter
6875
for topic, msg, stamp, connection_header in bags.read_messages(
69-
topics=topics, start_time=time_ranges, return_connection_header=True, raw=bag_filter.is_raw,
70-
connection_filter=connection_filter):
76+
topics=topics, start_time=time_ranges, end_time=extra_time_ranges, return_connection_header=True,
77+
raw=bag_filter.is_raw, connection_filter=connection_filter):
7178
queue.put((topic, msg, stamp, connection_header))
7279
while not queue.empty():
7380
_topic, _msg, _stamp, _connection_header = queue.get()
7481
_connection_header = copy.copy(_connection_header) # Prevent modifying connection records from in bag
75-
ret = filter_message(_topic, _msg, _stamp, _connection_header, bag_filter, True)
82+
is_from_extra_time_ranges = False
83+
if extra_time_ranges is not None and time_ranges is not None:
84+
is_from_extra_time_ranges = _stamp in extra_time_ranges and _stamp not in time_ranges
85+
ret = filter_message(_topic, _msg, _stamp, _connection_header, bag_filter, True, is_from_extra_time_ranges)
7686
if not isinstance(ret, list):
7787
ret = [ret]
7888
for data in ret[1:]:

cras_bag_tools/src/cras_bag_tools/bag_utils.py

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,38 +48,54 @@ def __getattr__(self, item):
4848
return getattr(self.bag, item)
4949

5050
def _get_entries(self, connections=None, start_time=None, end_time=None):
51+
all_ranges = None
52+
if start_time is not None and isinstance(start_time, TimeRanges):
53+
all_ranges = copy.copy(start_time)
54+
if end_time is not None and isinstance(end_time, TimeRanges):
55+
if all_ranges is None:
56+
all_ranges = copy.copy(end_time)
57+
else:
58+
all_ranges.append(end_time.ranges)
59+
5160
for entry in heapq.merge(*self.bag._get_indexes(connections), key=lambda x: x.time.to_nsec()): # noqa
52-
if start_time is not None:
53-
if isinstance(start_time, TimeRanges):
54-
time_ranges = start_time
55-
if time_ranges > entry.time:
56-
continue
57-
elif time_ranges < entry.time:
58-
return
59-
elif entry.time not in time_ranges:
60-
continue
61-
elif entry.time < start_time:
61+
if all_ranges is not None:
62+
if all_ranges > entry.time:
6263
continue
63-
if end_time is not None and entry.time > end_time:
64-
return
64+
elif all_ranges < entry.time:
65+
return
66+
elif entry.time not in all_ranges:
67+
continue
68+
else:
69+
if start_time is not None and isinstance(start_time, genpy.Time) and entry.time < start_time:
70+
continue
71+
if end_time is not None and isinstance(end_time, genpy.Time) and entry.time > end_time:
72+
return
6573
yield entry
6674

6775
def _get_entries_reverse(self, connections=None, start_time=None, end_time=None):
76+
all_ranges = None
77+
if start_time is not None and isinstance(start_time, TimeRanges):
78+
all_ranges = copy.copy(start_time)
79+
if end_time is not None and isinstance(end_time, TimeRanges):
80+
if all_ranges is None:
81+
all_ranges = copy.copy(end_time)
82+
else:
83+
all_ranges.append(end_time.ranges)
84+
6885
for entry in heapq.merge(*(reversed(index) for index in self._get_indexes(connections)),
6986
key=lambda x: x.time.to_nsec(), reverse=True):
70-
if start_time is not None:
71-
if isinstance(start_time, TimeRanges):
72-
time_ranges = start_time
73-
if time_ranges > entry.time:
74-
return
75-
elif time_ranges < entry.time:
76-
continue
77-
elif entry.time not in time_ranges:
78-
continue
79-
elif entry.time < start_time:
87+
if all_ranges is not None:
88+
if all_ranges > entry.time:
8089
return
81-
if end_time is not None and entry.time > end_time:
82-
continue
90+
elif all_ranges < entry.time:
91+
continue
92+
elif entry.time not in all_ranges:
93+
continue
94+
else:
95+
if start_time is not None and isinstance(start_time, genpy.Time) and entry.time < start_time:
96+
return
97+
if end_time is not None and isinstance(end_time, genpy.Time) and entry.time > end_time:
98+
continue
8399
yield entry
84100

85101

@@ -91,12 +107,24 @@ def __init__(self,
91107
mode='r', # type: STRING_TYPE
92108
compression=rosbag.Compression.NONE, # type: rosbag.Compression
93109
options=None, # type: Dict[STRING_TYPE, Any]
94-
skip_index=False # type: bool
110+
skip_index=False, # type: bool
111+
limit_to_first_bag=False, # type: bool
95112
):
113+
"""
114+
:param bag_files: The paths to bags to open (either a sequence of colon-separated string of paths).
115+
:param mode: Open mode (r/w/a).
116+
:param compression: Compression (used for write mode).
117+
:param options: Bag options (compression, chunk threshold, ...).
118+
:param skip_index: Whether index should be read right away. Otherwise, call read_index() when you need it.
119+
:param limit_to_first_bag: If True, the multibag will report its start and end to be equal to the
120+
first open bag. If False, the start and end correspond to the earliest and latest
121+
stamp in all bags.
122+
"""
96123

97124
if isinstance(bag_files, STRING_TYPE):
98125
bag_files = bag_files.split(os.path.pathsep)
99126
self.bags = [self.open_bag(b, compression, mode, options, skip_index) for b in bag_files]
127+
self._limit_to_first_bag = limit_to_first_bag
100128

101129
def open_bag(self, b, compression, mode, options, skip_index):
102130
return rosbag.Bag(b, mode, compression, options=options, skip_index=skip_index)
@@ -115,23 +143,33 @@ def __del__(self):
115143

116144
@property
117145
def size(self):
146+
if self._limit_to_first_bag:
147+
return self.bags[0].size
118148
return sum(b.size for b in self.bags)
119149

120150
def close(self):
121151
if hasattr(self, 'bags'):
122152
for b in self.bags:
123153
b.close()
124154

125-
def get_message_count(self, topic_filters=None, start_time=None, end_time=None):
126-
# type: (Optional[Sequence[STRING_TYPE]], Optional[Union[genpy.Time, TimeRanges]], Optional[genpy.Time]) -> int
155+
def get_message_count(self,
156+
topic_filters=None, # type: Optional[Sequence[STRING_TYPE]]
157+
start_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
158+
end_time=None # type: Optional[Union[genpy.Time, TimeRanges]]
159+
):
160+
# type: (...) -> int
127161
connections = dict(self._get_connections(topic_filters, with_bag=True))
128162
entries = self._get_entries(connections, start_time, end_time)
129163
return sum(1 for _ in entries)
130164

131165
def get_start_time(self): # type: () -> float
166+
if self._limit_to_first_bag:
167+
return self.bags[0].get_start_time()
132168
return min(b.get_start_time() for b in self.bags)
133169

134170
def get_end_time(self): # type: () -> float
171+
if self._limit_to_first_bag:
172+
return self.bags[0].get_end_time()
135173
return max(b.get_end_time() for b in self.bags)
136174

137175
def read_index(self):
@@ -151,7 +189,7 @@ def _get_connections(self, topics=None, connection_filter=None, with_bag=False):
151189
def _get_entries(self,
152190
connections=None, # type: Optional[Dict[rosbag.Bag, Iterable[ConnectionInfo]]]
153191
start_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
154-
end_time=None, # type: Optional[genpy.Time]
192+
end_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
155193
):
156194
# type: (...) -> Iterator[Tuple[rosbag.Bag, ConnectionEntry, ConnectionInfo]]
157195
all_indexes = []
@@ -161,6 +199,11 @@ def _get_entries(self,
161199
for conn, index in zip(conns, indexes):
162200
all_indexes.append([(bag, entry, conn) for entry in index])
163201

202+
if start_time is None and self._limit_to_first_bag:
203+
start_time = genpy.Time(self.get_start_time())
204+
if end_time is None and self._limit_to_first_bag:
205+
end_time = genpy.Time(self.get_end_time())
206+
164207
time_ranges = None
165208
if start_time is not None and isinstance(start_time, TimeRanges):
166209
time_ranges = {}
@@ -170,7 +213,20 @@ def _get_entries(self,
170213
time_ranges[bag] = time_range
171214
start_time = None # Simplify the check in the next loop
172215

216+
extra_time_ranges = None
217+
if end_time is not None and isinstance(end_time, TimeRanges):
218+
extra_time_ranges = {}
219+
for bag in self.bags:
220+
time_range = copy.copy(end_time)
221+
time_range.set_base_time(genpy.Time(bag.get_start_time()))
222+
extra_time_ranges[bag] = time_range
223+
end_time = None # Simplify the check in the next loop
224+
173225
for bag, entry, conn in heapq.merge(*all_indexes, key=lambda x: x[1].time.to_nsec()):
226+
if extra_time_ranges is not None and entry.time in extra_time_ranges[bag]:
227+
yield bag, entry, conn
228+
continue
229+
174230
if start_time is not None and entry.time < start_time:
175231
continue
176232
if end_time is not None and entry.time > end_time:
@@ -189,7 +245,7 @@ def _get_entries(self,
189245
def _read_messages(self,
190246
topics=None, # type: Optional[Sequence[STRING_TYPE]]
191247
start_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
192-
end_time=None, # type: Optional[genpy.Time]
248+
end_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
193249
connection_filter=None, # type: Optional[ConnectionFilter]
194250
raw=False, # type: bool
195251
return_connection_header=False, # type: bool
@@ -204,7 +260,7 @@ def _read_messages(self,
204260
def read_messages(self,
205261
topics=None, # type: Optional[Sequence[STRING_TYPE]]
206262
start_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
207-
end_time=None, # type: Optional[genpy.Time]
263+
end_time=None, # type: Optional[Union[genpy.Time, TimeRanges]]
208264
connection_filter=None, # type: Optional[ConnectionFilter]
209265
raw=False, # type: bool
210266
return_connection_header=False, # type: bool

0 commit comments

Comments
 (0)