Skip to content

Commit 43a016a

Browse files
committed
Add rosbag2 bag reader module
1 parent 02f6f02 commit 43a016a

7 files changed

Lines changed: 708 additions & 0 deletions

File tree

fastproto-ros2-bag/pom.xml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>org.indunet</groupId>
9+
<artifactId>fastproto-parent</artifactId>
10+
<version>4.1.0</version>
11+
</parent>
12+
13+
<artifactId>fastproto-ros2-bag</artifactId>
14+
<packaging>jar</packaging>
15+
<name>FastProto ROS2 Bag</name>
16+
<description>ROS2 rosbag2 readers powered by FastProto ROS2 CDR codecs</description>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>org.indunet</groupId>
21+
<artifactId>fastproto-ros2-msg</artifactId>
22+
</dependency>
23+
24+
<dependency>
25+
<groupId>org.xerial</groupId>
26+
<artifactId>sqlite-jdbc</artifactId>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>org.yaml</groupId>
31+
<artifactId>snakeyaml</artifactId>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>org.junit.jupiter</groupId>
36+
<artifactId>junit-jupiter</artifactId>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.slf4j</groupId>
41+
<artifactId>slf4j-simple</artifactId>
42+
</dependency>
43+
</dependencies>
44+
</project>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.indunet.fastproto.ros2.bag;
2+
3+
import java.util.Arrays;
4+
5+
/**
6+
* One message row read from a rosbag2 storage file.
7+
*/
8+
public final class Ros2BagMessage {
9+
private final String topic;
10+
private final String type;
11+
private final long timestamp;
12+
private final byte[] payload;
13+
private final Object decodedMessage;
14+
15+
public Ros2BagMessage(String topic, String type, long timestamp, byte[] payload, Object decodedMessage) {
16+
this.topic = topic;
17+
this.type = type;
18+
this.timestamp = timestamp;
19+
this.payload = payload == null ? new byte[0] : Arrays.copyOf(payload, payload.length);
20+
this.decodedMessage = decodedMessage;
21+
}
22+
23+
public String getTopic() {
24+
return topic;
25+
}
26+
27+
public String getType() {
28+
return type;
29+
}
30+
31+
public long getTimestamp() {
32+
return timestamp;
33+
}
34+
35+
public byte[] getPayload() {
36+
return Arrays.copyOf(payload, payload.length);
37+
}
38+
39+
public Object getDecodedMessage() {
40+
return decodedMessage;
41+
}
42+
43+
public boolean isDecoded() {
44+
return decodedMessage != null;
45+
}
46+
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package org.indunet.fastproto.ros2.bag;
2+
3+
import org.yaml.snakeyaml.Yaml;
4+
5+
import java.io.Closeable;
6+
import java.io.IOException;
7+
import java.io.InputStream;
8+
import java.nio.file.DirectoryStream;
9+
import java.nio.file.Files;
10+
import java.nio.file.Path;
11+
import java.sql.Connection;
12+
import java.sql.DriverManager;
13+
import java.sql.PreparedStatement;
14+
import java.sql.ResultSet;
15+
import java.sql.SQLException;
16+
import java.sql.Statement;
17+
import java.util.ArrayList;
18+
import java.util.Collections;
19+
import java.util.LinkedHashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.function.Consumer;
23+
24+
/**
25+
* Reader for rosbag2 SQLite3 bags.
26+
*/
27+
public final class Ros2BagReader implements Closeable {
28+
private final Connection connection;
29+
private final Map<Integer, Ros2BagTopic> topicsById;
30+
31+
private Ros2BagReader(Connection connection) throws SQLException {
32+
this.connection = connection;
33+
this.topicsById = loadTopics(connection);
34+
}
35+
36+
public static Ros2BagReader open(Path path) throws IOException {
37+
Path database = resolveSqliteDatabase(path);
38+
39+
try {
40+
return new Ros2BagReader(DriverManager.getConnection("jdbc:sqlite:" + database.toAbsolutePath()));
41+
} catch (SQLException e) {
42+
throw new IOException("Failed to open rosbag2 SQLite database: " + database, e);
43+
}
44+
}
45+
46+
public List<Ros2BagTopic> topics() {
47+
return Collections.unmodifiableList(new ArrayList<Ros2BagTopic>(topicsById.values()));
48+
}
49+
50+
public List<Ros2BagMessage> readMessages() throws IOException {
51+
return readMessages(null);
52+
}
53+
54+
public List<Ros2BagMessage> readMessages(String topic) throws IOException {
55+
final List<Ros2BagMessage> messages = new ArrayList<Ros2BagMessage>();
56+
forEachMessage(topic, new Consumer<Ros2BagMessage>() {
57+
@Override
58+
public void accept(Ros2BagMessage message) {
59+
messages.add(message);
60+
}
61+
});
62+
return messages;
63+
}
64+
65+
public Map<String, List<Ros2BagMessage>> readMessagesByTopic() throws IOException {
66+
final Map<String, List<Ros2BagMessage>> messagesByTopic = new LinkedHashMap<String, List<Ros2BagMessage>>();
67+
for (Ros2BagTopic topic : topicsById.values()) {
68+
messagesByTopic.put(topic.getName(), new ArrayList<Ros2BagMessage>());
69+
}
70+
71+
forEachMessage(new Consumer<Ros2BagMessage>() {
72+
@Override
73+
public void accept(Ros2BagMessage message) {
74+
List<Ros2BagMessage> messages = messagesByTopic.get(message.getTopic());
75+
if (messages == null) {
76+
messages = new ArrayList<Ros2BagMessage>();
77+
messagesByTopic.put(message.getTopic(), messages);
78+
}
79+
messages.add(message);
80+
}
81+
});
82+
83+
return messagesByTopic;
84+
}
85+
86+
public void forEachMessage(Consumer<Ros2BagMessage> consumer) throws IOException {
87+
forEachMessage(null, consumer);
88+
}
89+
90+
public void forEachMessage(String topic, Consumer<Ros2BagMessage> consumer) throws IOException {
91+
String sql = "select topic_id, timestamp, data from messages";
92+
if (topic != null) {
93+
sql += " where topic_id = ?";
94+
}
95+
sql += " order by timestamp, id";
96+
97+
try (PreparedStatement statement = connection.prepareStatement(sql)) {
98+
if (topic != null) {
99+
statement.setInt(1, findTopicId(topic));
100+
}
101+
102+
try (ResultSet resultSet = statement.executeQuery()) {
103+
while (resultSet.next()) {
104+
int topicId = resultSet.getInt("topic_id");
105+
Ros2BagTopic bagTopic = topicsById.get(topicId);
106+
if (bagTopic == null) {
107+
continue;
108+
}
109+
110+
byte[] payload = resultSet.getBytes("data");
111+
Object decoded = Ros2MessageDecoder.decode(bagTopic.getType(), payload);
112+
consumer.accept(new Ros2BagMessage(
113+
bagTopic.getName(),
114+
bagTopic.getType(),
115+
resultSet.getLong("timestamp"),
116+
payload,
117+
decoded
118+
));
119+
}
120+
}
121+
} catch (SQLException e) {
122+
throw new IOException("Failed to read rosbag2 messages.", e);
123+
}
124+
}
125+
126+
@Override
127+
public void close() throws IOException {
128+
try {
129+
connection.close();
130+
} catch (SQLException e) {
131+
throw new IOException("Failed to close rosbag2 SQLite database.", e);
132+
}
133+
}
134+
135+
private int findTopicId(String topic) throws IOException {
136+
for (Ros2BagTopic bagTopic : topicsById.values()) {
137+
if (bagTopic.getName().equals(topic)) {
138+
return bagTopic.getId();
139+
}
140+
}
141+
142+
throw new IOException("Unknown rosbag2 topic: " + topic);
143+
}
144+
145+
private static Map<Integer, Ros2BagTopic> loadTopics(Connection connection) throws SQLException {
146+
Map<Integer, Ros2BagTopic> topics = new LinkedHashMap<Integer, Ros2BagTopic>();
147+
try (Statement statement = connection.createStatement();
148+
ResultSet resultSet = statement.executeQuery(
149+
"select id, name, type, serialization_format, offered_qos_profiles from topics order by id")) {
150+
while (resultSet.next()) {
151+
Ros2BagTopic topic = new Ros2BagTopic(
152+
resultSet.getInt("id"),
153+
resultSet.getString("name"),
154+
resultSet.getString("type"),
155+
resultSet.getString("serialization_format"),
156+
resultSet.getString("offered_qos_profiles")
157+
);
158+
topics.put(topic.getId(), topic);
159+
}
160+
}
161+
162+
return topics;
163+
}
164+
165+
private static Path resolveSqliteDatabase(Path path) throws IOException {
166+
if (Files.isRegularFile(path)) {
167+
return path;
168+
}
169+
170+
if (!Files.isDirectory(path)) {
171+
throw new IOException("rosbag2 path does not exist: " + path);
172+
}
173+
174+
Path metadata = path.resolve("metadata.yaml");
175+
if (Files.exists(metadata)) {
176+
Path fromMetadata = sqliteDatabaseFromMetadata(path, metadata);
177+
if (fromMetadata != null) {
178+
if (!Files.isRegularFile(fromMetadata)) {
179+
throw new IOException("rosbag2 metadata references missing SQLite file: " + fromMetadata);
180+
}
181+
return fromMetadata;
182+
}
183+
}
184+
185+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.db3")) {
186+
for (Path candidate : stream) {
187+
return candidate;
188+
}
189+
}
190+
191+
throw new IOException("No rosbag2 SQLite .db3 file found under: " + path);
192+
}
193+
194+
@SuppressWarnings("unchecked")
195+
private static Path sqliteDatabaseFromMetadata(Path bagDirectory, Path metadata) throws IOException {
196+
Yaml yaml = new Yaml();
197+
Object loaded;
198+
try (InputStream inputStream = Files.newInputStream(metadata)) {
199+
loaded = yaml.load(inputStream);
200+
}
201+
202+
if (!(loaded instanceof Map)) {
203+
return null;
204+
}
205+
206+
Object rosbag2BagfileInformation = ((Map<String, Object>) loaded).get("rosbag2_bagfile_information");
207+
if (!(rosbag2BagfileInformation instanceof Map)) {
208+
return null;
209+
}
210+
211+
Map<String, Object> information = (Map<String, Object>) rosbag2BagfileInformation;
212+
Object storageIdentifier = information.get("storage_identifier");
213+
if (storageIdentifier != null && !"sqlite3".equals(storageIdentifier.toString())) {
214+
throw new IOException("Unsupported rosbag2 storage identifier: " + storageIdentifier);
215+
}
216+
217+
Object relativeFilePaths = information.get("relative_file_paths");
218+
if (!(relativeFilePaths instanceof List)) {
219+
return null;
220+
}
221+
222+
for (Object relativeFilePath : (List<Object>) relativeFilePaths) {
223+
if (relativeFilePath != null && relativeFilePath.toString().endsWith(".db3")) {
224+
return bagDirectory.resolve(relativeFilePath.toString());
225+
}
226+
}
227+
228+
return null;
229+
}
230+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.indunet.fastproto.ros2.bag;
2+
3+
/**
4+
* Topic metadata from a rosbag2 SQLite storage file.
5+
*/
6+
public final class Ros2BagTopic {
7+
private final int id;
8+
private final String name;
9+
private final String type;
10+
private final String serializationFormat;
11+
private final String offeredQosProfiles;
12+
13+
public Ros2BagTopic(int id, String name, String type, String serializationFormat, String offeredQosProfiles) {
14+
this.id = id;
15+
this.name = name;
16+
this.type = type;
17+
this.serializationFormat = serializationFormat;
18+
this.offeredQosProfiles = offeredQosProfiles;
19+
}
20+
21+
public int getId() {
22+
return id;
23+
}
24+
25+
public String getName() {
26+
return name;
27+
}
28+
29+
public String getType() {
30+
return type;
31+
}
32+
33+
public String getSerializationFormat() {
34+
return serializationFormat;
35+
}
36+
37+
public String getOfferedQosProfiles() {
38+
return offeredQosProfiles;
39+
}
40+
}

0 commit comments

Comments
 (0)