Skip to content

Commit a90cdf8

Browse files
crates: Add serialization
Signed-off-by: Patrick José Pereira <patrickelectric@gmail.com>
1 parent 315f7a8 commit a90cdf8

File tree

3 files changed

+221
-0
lines changed

3 files changed

+221
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ members = [
1010
"crates/generators/python",
1111
"crates/generators/rust",
1212
"crates/wasm",
13+
"crates/serialization",
1314
]
1415

1516
exclude = [

crates/serialization/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "blueberry-serialization"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
cdr = "0.2.4"
8+
serde = { version = "1.0.228", features = ["derive"] }
9+
zenoh = "1.6.2"

crates/serialization/src/main.rs

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
use std::error::Error;
2+
use std::fmt;
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::{Arc, mpsc};
5+
use std::thread;
6+
use std::time::Duration;
7+
8+
use cdr::{CdrLe, Infinite, deserialize, serialize};
9+
use serde::{Deserialize, Serialize};
10+
use zenoh::{Config, Wait};
11+
12+
pub type Result<T> = std::result::Result<T, SerializeError>;
13+
14+
fn main() -> Result<()> {
15+
println!("Starting serialization test");
16+
zenoh::init_log_from_env_or("error");
17+
18+
const PUBLISH_TOPIC: &str = "blueberry/person/default";
19+
const SUBSCRIBE_TOPIC: &str = "blueberry/person/expected";
20+
21+
let expected_person = Person::default();
22+
let expected_bytes = serialize_person(&expected_person)?;
23+
24+
let session = zenoh::open(Config::default())
25+
.wait()
26+
.map_err(SerializeError::Zenoh)?;
27+
28+
let publisher = session
29+
.declare_publisher(PUBLISH_TOPIC)
30+
.wait()
31+
.map_err(SerializeError::Zenoh)?;
32+
let subscriber = session
33+
.declare_subscriber(SUBSCRIBE_TOPIC)
34+
.wait()
35+
.map_err(SerializeError::Zenoh)?;
36+
37+
let stop = Arc::new(AtomicBool::new(false));
38+
let (tx, rx) = mpsc::channel();
39+
40+
{
41+
let stop = Arc::clone(&stop);
42+
let tx = tx.clone();
43+
let bytes = expected_bytes.clone();
44+
thread::spawn(move || {
45+
while !stop.load(Ordering::Relaxed) {
46+
if let Err(err) = publisher.put(bytes.clone()).wait() {
47+
let _ = tx.send(Err(SerializeError::Zenoh(err)));
48+
stop.store(true, Ordering::Relaxed);
49+
break;
50+
}
51+
thread::sleep(Duration::from_secs(1));
52+
}
53+
});
54+
}
55+
56+
{
57+
let stop = Arc::clone(&stop);
58+
let tx = tx.clone();
59+
let expected = expected_person.clone();
60+
thread::spawn(move || {
61+
loop {
62+
if stop.load(Ordering::Relaxed) {
63+
break;
64+
}
65+
match subscriber.recv() {
66+
Ok(sample) => {
67+
let bytes = sample.payload().to_bytes();
68+
let result = match deserialize::<Person>(&bytes) {
69+
Ok(person) => {
70+
if person == expected {
71+
Ok(())
72+
} else {
73+
Err(SerializeError::Mismatch(person))
74+
}
75+
}
76+
Err(err) => Err(SerializeError::Cdr(err)),
77+
};
78+
let _ = tx.send(result);
79+
stop.store(true, Ordering::Relaxed);
80+
break;
81+
}
82+
Err(err) => {
83+
let _ = tx.send(Err(SerializeError::Zenoh(err)));
84+
stop.store(true, Ordering::Relaxed);
85+
break;
86+
}
87+
}
88+
}
89+
});
90+
}
91+
92+
match rx.recv_timeout(Duration::from_secs(60)) {
93+
Ok(Ok(())) => Ok(()),
94+
Ok(Err(err)) => Err(err),
95+
Err(_) => Err(SerializeError::Timeout),
96+
}
97+
}
98+
99+
/// Representation of the `Person` message described in the example IDL.
100+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
101+
pub struct Person {
102+
pub name: String,
103+
pub age: i32,
104+
pub is_active: bool,
105+
pub codename: String,
106+
pub readings: [i32; 8],
107+
}
108+
109+
impl Default for Person {
110+
fn default() -> Self {
111+
Self {
112+
name: "Potato".into(),
113+
age: 0x1234,
114+
is_active: true,
115+
codename: "maisquenada".into(),
116+
readings: [0, 1, 2, 3, 4, 0xffff, 6, 7],
117+
}
118+
}
119+
}
120+
121+
/// Serializes a `Person` using CDR little-endian encoding with an encapsulation
122+
/// header, honoring field alignment and boolean-as-byte rules.
123+
pub fn serialize_person(person: &Person) -> Result<Vec<u8>> {
124+
const CODENAME_BOUND: usize = 16;
125+
let codename_len = person.codename.as_bytes().len();
126+
if codename_len > CODENAME_BOUND {
127+
return Err(SerializeError::StringTooLong {
128+
field: "codename",
129+
max: CODENAME_BOUND,
130+
len: codename_len,
131+
});
132+
}
133+
134+
serialize::<_, _, CdrLe>(person, Infinite).map_err(SerializeError::Cdr)
135+
}
136+
137+
/// Deserializes a `Person` using CDR little-endian encoding with an encapsulation header.
138+
pub fn deserialize_person(bytes: &[u8]) -> Result<Person> {
139+
deserialize(bytes).map_err(SerializeError::Cdr)
140+
}
141+
142+
#[derive(Debug)]
143+
pub enum SerializeError {
144+
StringTooLong {
145+
field: &'static str,
146+
max: usize,
147+
len: usize,
148+
},
149+
Cdr(cdr::Error),
150+
Zenoh(zenoh::Error),
151+
Mismatch(Person),
152+
Timeout,
153+
}
154+
155+
impl fmt::Display for SerializeError {
156+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157+
match self {
158+
SerializeError::StringTooLong { field, max, len } => {
159+
write!(f, "string field `{field}` too long: {len} > {max}")
160+
}
161+
SerializeError::Cdr(err) => write!(f, "cdr serialization failed: {err}"),
162+
SerializeError::Zenoh(err) => write!(f, "zenoh error: {err}"),
163+
SerializeError::Mismatch(person) => {
164+
write!(f, "received unexpected person payload: {person:?}")
165+
}
166+
SerializeError::Timeout => write!(f, "no message received within 60s"),
167+
}
168+
}
169+
}
170+
171+
impl Error for SerializeError {}
172+
173+
#[cfg(test)]
174+
mod tests {
175+
use super::*;
176+
177+
const PERSON_DEFAULT_BYTES: [u8; 72] = [
178+
0x00, 0x01, 0x00, 0x00, // CDR little-endian encapsulation header
179+
0x07, 0x00, 0x00, 0x00, // name length (includes null)
180+
0x50, 0x6f, 0x74, 0x61, 0x74, 0x6f, 0x00, // "Potato\0"
181+
0x00, // padding to 4-byte alignment
182+
0x34, 0x12, 0x00, 0x00, // age = 0x1234
183+
0x01, // is_active = true (byte-encoded bool)
184+
0x00, 0x00, 0x00, // padding before codename (align to 4)
185+
0x0c, 0x00, 0x00, 0x00, // codename length (includes null)
186+
0x6d, 0x61, 0x69, 0x73, 0x71, 0x75, 0x65, 0x6e, 0x61, 0x64, 0x61,
187+
0x00, // "maisquenada\0"
188+
0x00, 0x00, 0x00, 0x00, // readings[0] = 0
189+
0x01, 0x00, 0x00, 0x00, // readings[1] = 1
190+
0x02, 0x00, 0x00, 0x00, // readings[2] = 2
191+
0x03, 0x00, 0x00, 0x00, // readings[3] = 3
192+
0x04, 0x00, 0x00, 0x00, // readings[4] = 4
193+
0xff, 0xff, 0x00, 0x00, // readings[5] = 0xffff
194+
0x06, 0x00, 0x00, 0x00, // readings[6] = 6
195+
0x07, 0x00, 0x00, 0x00, // readings[7] = 7
196+
];
197+
198+
#[test]
199+
fn serializes_default_person() {
200+
let bytes = serialize_person(&Person::default()).expect("serialize default person");
201+
assert_eq!(bytes, PERSON_DEFAULT_BYTES);
202+
}
203+
204+
#[test]
205+
fn rejects_codename_overflow() {
206+
let mut person = Person::default();
207+
person.codename = "this-is-way-too-long".into();
208+
let err = serialize_person(&person).unwrap_err();
209+
assert!(matches!(err, SerializeError::StringTooLong { .. }));
210+
}
211+
}

0 commit comments

Comments
 (0)