diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index cc91e27..3ab9ccb 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -1,20 +1,20 @@ -name: Rust Project CI - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - build_and_test: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - - name: Build - run: cargo build --verbose - - - name: Run tests +name: Rust Project CI + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + build_and_test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Build + run: cargo build --verbose + + - name: Run tests run: cargo test --verbose \ No newline at end of file diff --git a/.gitignore b/.gitignore index ea8c4bf..660deb2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/target +/target +.DS_Store \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2d7d142..a4fdb3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,56 @@ dependencies = [ "memchr", ] +[[package]] +name = "anstream" +version = "0.6.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.59.0", +] + [[package]] name = "anyhow" version = "1.0.98" @@ -43,6 +93,12 @@ dependencies = [ "syn", ] +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "backtrace" version = "0.3.75" @@ -87,6 +143,12 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + [[package]] name = "byteorder" version = "1.5.0" @@ -144,6 +206,18 @@ dependencies = [ "libloading", ] +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "either" version = "1.15.0" @@ -170,6 +244,29 @@ dependencies = [ "syn", ] +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -198,6 +295,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "fragile" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" + [[package]] name = "getrandom" version = "0.2.16" @@ -442,6 +545,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.12.1" @@ -451,6 +560,40 @@ dependencies = [ "either", ] +[[package]] +name = "jiff" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", +] + +[[package]] +name = "jiff-static" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -491,6 +634,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -524,6 +677,43 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.59.0", +] + +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "nom" version = "7.1.3" @@ -549,18 +739,62 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + [[package]] name = "outref" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -570,6 +804,32 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "prettyplease" version = "0.2.33" @@ -743,6 +1003,15 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "redox_syscall" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d04b7d0ee6b4a0207a0a7adb104d23ecb0b47d6beae7152d0fa34b692b29fd6" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -810,6 +1079,18 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustversion" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.219" @@ -851,6 +1132,31 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "syn" version = "2.0.101" @@ -885,6 +1191,45 @@ dependencies = [ "winapi", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + +[[package]] +name = "test-case" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8" +dependencies = [ + "test-case-macros", +] + +[[package]] +name = "test-case-core" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "test-case-macros" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "test-case-core", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -918,7 +1263,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" dependencies = [ "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -997,6 +1361,7 @@ dependencies = [ "async-trait", "bytes", "mediatype", + "mockall", "protobuf", "protobuf-codegen", "protoc-bin-vendored", @@ -1013,8 +1378,17 @@ name = "up-transport-iceoryx2-rust" version = "0.1.0" dependencies = [ "async-trait", + "bytes", + "env_logger", "iceoryx2", + "iceoryx2-bb-container", + "log", + "once_cell", + "protobuf", + "test-case", + "tokio", "up-rust", + "uuid", ] [[package]] @@ -1027,6 +1401,23 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "uuid" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "uuid-simd" version = "0.8.0" @@ -1058,6 +1449,64 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + [[package]] name = "which" version = "4.4.2" @@ -1101,6 +1550,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/Cargo.toml index 942eae5..ce2eb3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,19 @@ version = "0.1.0" edition = "2024" [dependencies] -up-rust = "0.5" +up-rust = { version = "0.5.0", features = ["test-util"] } iceoryx2 = "0.6.1" async-trait = "0.1" +protobuf = "3.7.2" +bytes = "1.10.1" +tokio = { version = "1", features = ["full"] } +test-case = "3" +log = "0.4.27" +env_logger = "0.11.8" +once_cell = "1.18.0" +uuid = { version = "1", features = ["v4"] } +iceoryx2-bb-container = "0.6.1" + + + + diff --git a/README.md b/README.md index 3812ca6..c65d78f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# up-transport-iceoryx2-rust -Rust uTransport implementation for iceoryx2 \ No newline at end of file +# up-transport-iceoryx2-rust +Rust uTransport implementation for iceoryx2 diff --git a/src/bin/listener.rs b/src/bin/listener.rs new file mode 100644 index 0000000..1cef601 --- /dev/null +++ b/src/bin/listener.rs @@ -0,0 +1,58 @@ +use async_trait::async_trait; +use env_logger; +use std::str::FromStr; +use std::sync::Arc; +use std::sync::Once; +use tokio::sync::Notify; +use up_rust::{UListener, UMessage, UTransport, UUri}; +use up_transport_iceoryx2_rust::Iceoryx2Transport; + +static INIT_LOGGER: Once = Once::new(); + +fn init_logger() { + INIT_LOGGER.call_once(|| { + env_logger::init(); + }); +} + +pub struct Receiver { + notify: Arc, +} + +#[async_trait] +impl UListener for Receiver { + async fn on_receive(&self, message: UMessage) { + if let Some(payload) = &message.payload { + print!("Received payload bytes (decimal): "); + for byte in payload { + print!("{} ", byte); + } + println!(); + } else { + println!("Received message with no payload."); + } + self.notify.notify_one(); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + init_logger(); + + let notify = Arc::new(Notify::new()); + let receiver = Arc::new(Receiver { + notify: notify.clone(), + }); + + let topic = UUri::from_str("//vehicle_shared/10A10B/1/CA5D")?; + let transport = Iceoryx2Transport::new().unwrap(); + + transport.register_listener(&topic, None, receiver).await?; + + println!("Listener started, waiting for messages..."); + + loop { + notify.notified().await; + println!("Message received."); + } +} diff --git a/src/bin/sender.rs b/src/bin/sender.rs new file mode 100644 index 0000000..8bdbe3a --- /dev/null +++ b/src/bin/sender.rs @@ -0,0 +1,40 @@ +use env_logger; +use std::str::FromStr; +use std::sync::Once; + +use up_rust::{UMessageBuilder, UPayloadFormat, UTransport, UUri}; +use up_transport_iceoryx2_rust::Iceoryx2Transport; + +static INIT_LOGGER: Once = Once::new(); + +fn init_logger() { + INIT_LOGGER.call_once(|| { + env_logger::init(); + }); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + init_logger(); + + let topic = UUri::from_str("//vehicle_shared/10A10B/1/CA5D")?; + let transport = Iceoryx2Transport::new().unwrap(); + + let mut payload: Vec = vec![1, 2, 3, 4, 5, 6]; + + loop { + // increment each byte + for b in payload.iter_mut() { + *b = b.wrapping_add(1); + } + + let umessage = UMessageBuilder::publish(topic.clone()) + .build_with_payload(payload.clone(), UPayloadFormat::UPAYLOAD_FORMAT_RAW)?; + + transport.send(umessage).await?; + + println!("Message sent. Payload: {:?}", payload); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } +} diff --git a/src/custom_header.rs b/src/custom_header.rs new file mode 100644 index 0000000..516b0f0 --- /dev/null +++ b/src/custom_header.rs @@ -0,0 +1,70 @@ +use iceoryx2::prelude::*; +use up_rust::{UAttributes, UMessage, UStatus}; + +#[derive(Default, Debug, ZeroCopySend)] +#[type_name("CustomHeader")] +#[repr(C)] +pub struct CustomHeader { + pub version: i32, + pub timestamp: u64, +} + +impl CustomHeader { + pub fn from_user_header(header: &Self) -> Result { + Ok(Self { + version: header.version, + timestamp: header.timestamp, + }) + } + + pub fn from_message(message: &UMessage) -> Result { + // Extract basic information from UMessage attributes + let version = if let Some(_attributes) = &message.attributes.0 { + 1 // Use a default version for now + } else { + 1 // Default version + }; + + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + Ok(Self { version, timestamp }) + } +} + +// Assuming UAttributes has a field called `fields` which is Vec<(String, String)> +// Adjust if your actual UAttributes is different +impl From<&CustomHeader> for UAttributes { + fn from(header: &CustomHeader) -> UAttributes { + let mut attrs = UAttributes::default(); + + // Map CustomHeader fields back to UAttributes using available fields + // Based on the error message, available fields are: id, type_, source, sink, priority, etc. + + // Set default values for required fields + attrs.type_ = up_rust::UMessageType::UMESSAGE_TYPE_PUBLISH.into(); + attrs.priority = up_rust::UPriority::UPRIORITY_CS4.into(); + + // Note: version and timestamp information is preserved in the CustomHeader + // but UAttributes doesn't have direct fields for them, so we use defaults + + attrs + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_custom_header_from_user_header() { + let header = CustomHeader { + version: 2, + timestamp: 1000, + }; + let new_header = CustomHeader::from_user_header(&header).unwrap(); + assert_eq!(new_header.version, 2); + assert_eq!(new_header.timestamp, 1000); + } +} diff --git a/src/lib.rs b/src/lib.rs index b03e38a..f976718 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,46 +1,57 @@ use async_trait::async_trait; +use iceoryx2::prelude::*; +use protobuf::MessageField; use std::sync::Arc; -use up_rust::{UCode, UListener, UMessage, UStatus, UTransport, UUri}; +use up_rust::{UAttributes, UCode, UListener, UMessage, UStatus, UTransport, UUri}; -/// This will be the main struct for our uProtocol transport. -/// It will hold the state necessary to communicate with iceoryx2, -/// such as the service connection and active listeners. -pub struct Iceoryx2Transport {} +mod custom_header; +pub use custom_header::CustomHeader; +mod raw_bytes; +use raw_bytes::RawBytes; + +use iceoryx2_bb_container::vec::FixedSizeVec; +use std::collections::HashMap; +use std::thread; + +enum TransportCommand { + Send { + message: UMessage, + response: std::sync::mpsc::Sender>, + }, + RegisterListener { + source_filter: UUri, + sink_filter: Option, + listener: Arc, + response: std::sync::mpsc::Sender>, + }, + UnregisterListener { + source_filter: UUri, + sink_filter: Option, + listener: Arc, + response: std::sync::mpsc::Sender>, + }, +} + +pub struct Iceoryx2Transport { + command_sender: std::sync::mpsc::Sender, +} enum MessageType { RpcRequest, RpcResponseOrNotification, Publish, } -// The #[async_trait] attribute enables async functions in our trait impl. -#[async_trait] -impl UTransport for Iceoryx2Transport { - async fn send(&self, _message: UMessage) -> Result<(), UStatus> { - todo!(); - } +impl Iceoryx2Transport { + pub fn new() -> Result { + let (tx, rx) = std::sync::mpsc::channel(); - async fn register_listener( - &self, - _source_filter: &UUri, - _sink_filter: Option<&UUri>, - _listener: Arc, - ) -> Result<(), UStatus> { - todo!() - } + thread::spawn(move || { + Self::background_task(rx); + }); - async fn unregister_listener( - &self, - _source_filter: &UUri, - _sink_filter: Option<&UUri>, - _listener: Arc, - ) -> Result<(), UStatus> { - todo!() + Ok(Self { command_sender: tx }) } -} - -#[allow(dead_code)] -impl Iceoryx2Transport { fn encode_uuri_segments(uuri: &UUri) -> Vec { vec![ uuri.authority_name.clone(), @@ -55,34 +66,48 @@ impl Iceoryx2Transport { format!("{:X}", value) } - /// Assumption: valid source and sink URIs provided: - /// send() makes use of UAttributesValidator - /// register_listener() and unregister_listener() use verify_filter_criteria() - /// Criteria for identification of message types can be found here: https://github.com/eclipse-uprotocol/up-spec/blob/main/basics/uattributes.adoc - fn determine_message_type(source: &UUri, sink: Option<&UUri>) -> Result { - let src_id = source.resource_id; - let sink_id = sink.map(|s| s.resource_id); + fn compute_service_name_from_message(message: &UMessage) -> Result { + let join_segments = |segments: Vec| segments.join("/"); - if src_id == 0 { - if let Some(id) = sink_id { - if id >= 1 && id <= 0x7FFF { - return Ok(MessageType::RpcRequest); - } - } - } else if sink_id == Some(0) && src_id >= 1 && src_id <= 0xFFFE { - return Ok(MessageType::RpcResponseOrNotification); - } else if src_id >= 1 && src_id <= 0x7FFF { - return Ok(MessageType::Publish); - } + if message.is_publish() { + let source = message.source().ok_or_else(|| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Missing source URI") + })?; + let segments = Self::encode_uuri_segments(source); + Ok(format!("up/{}", join_segments(segments))) + } else if message.is_request() { + let sink = message.sink().ok_or_else(|| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Missing sink URI") + })?; + let segments = Self::encode_uuri_segments(sink); + Ok(format!("up/{}", join_segments(segments))) + } else if message.is_response() || message.is_notification() { + let source = message.source().ok_or_else(|| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Missing source URI") + })?; + let sink = message.sink().ok_or_else(|| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Missing sink URI") + })?; - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Unsupported UMessageType", - )) + let source_segments = Self::encode_uuri_segments(source); + let sink_segments = Self::encode_uuri_segments(sink); + Ok(format!( + "up/{}/{}", + join_segments(source_segments), + join_segments(sink_segments) + )) + } else { + Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Unsupported UMessageType", + )) + } } - /// Called in send(), register_listener() and unregister_listener() - fn compute_service_name(source: &UUri, sink: Option<&UUri>) -> Result { + fn compute_service_name_from_uris( + source: &UUri, + sink: Option<&UUri>, + ) -> Result { let join_segments = |segments: Vec| segments.join("/"); match Self::determine_message_type(source, sink)? { @@ -117,103 +142,579 @@ impl Iceoryx2Transport { } } } + + fn determine_message_type(source: &UUri, sink: Option<&UUri>) -> Result { + let src_id = source.resource_id; + let sink_id = sink.map(|s| s.resource_id); + + if src_id == 0 { + if let Some(id) = sink_id { + if id >= 1 && id <= 0x7FFF { + return Ok(MessageType::RpcRequest); + } + } + } else if sink_id == Some(0) && src_id >= 1 && src_id <= 0xFFFE { + return Ok(MessageType::RpcResponseOrNotification); + } else if src_id >= 1 && src_id <= 0x7FFF { + return Ok(MessageType::Publish); + } + + Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Unsupported UMessageType", + )) + } + + fn compute_listener_service_name( + source_filter: &UUri, + sink_filter: Option<&UUri>, + ) -> Result { + let join_segments = |segments: Vec| segments.join("/"); + + match sink_filter { + None => { + let segments = Self::encode_uuri_segments(source_filter); + Ok(format!("up/{}", join_segments(segments))) + } + Some(sink) => { + let source_segments = Self::encode_uuri_segments(source_filter); + let sink_segments = Self::encode_uuri_segments(sink); + Ok(format!( + "up/{}/{}", + join_segments(source_segments), + join_segments(sink_segments) + )) + } + } + } + fn background_task(rx: std::sync::mpsc::Receiver) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + + rt.block_on(async { + let node = match NodeBuilder::new().create::() { + Ok(node) => node, + Err(e) => { + eprintln!("Failed to create iceoryx2 node: {}", e); + return; + } + }; + + let mut publishers: HashMap< + String, + iceoryx2::port::publisher::Publisher< + ipc::Service, + FixedSizeVec, + CustomHeader, + >, + > = HashMap::new(); + + let mut subscribers: HashMap< + String, + iceoryx2::port::subscriber::Subscriber< + ipc::Service, + FixedSizeVec, + CustomHeader, + >, + > = HashMap::new(); + + let mut listeners: HashMap>> = HashMap::new(); + + loop { + while let Ok(command) = rx.try_recv() { + match command { + TransportCommand::Send { message, response } => { + let service_name = + match Self::compute_service_name_from_message(&message) { + Ok(name) => name, + Err(e) => { + let _ = response.send(Err(e)); + continue; + } + }; + + let publisher = + publishers.entry(service_name.clone()).or_insert_with(|| { + let service_name_res: Result = + service_name.as_str().try_into(); + let service = node + .service_builder(&service_name_res.unwrap()) + .publish_subscribe::>() + .user_header::() + .open_or_create() + .expect("Failed to create service"); + + service + .publisher_builder() + .create() + .expect("Failed to create publisher") + }); + + let result = Self::handle_send(publisher, message); + let _ = response.send(result); + } + TransportCommand::RegisterListener { + source_filter, + sink_filter, + listener, + response, + } => { + let res = Self::handle_register_listener( + &node, + &mut subscribers, + &mut listeners, + source_filter, + sink_filter.as_ref(), + listener, + ); + let _ = response.send(res); + } + TransportCommand::UnregisterListener { + source_filter, + sink_filter, + listener, + response, + } => { + let res = Self::handle_unregister_listener( + &mut subscribers, + &mut listeners, + source_filter, + sink_filter.as_ref(), + &listener, + ); + let _ = response.send(res); + } + } + } + + let active_services: Vec<(String, Vec>)> = listeners + .iter() + .filter(|(service_name, listeners_vec)| { + !listeners_vec.is_empty() && subscribers.contains_key(*service_name) + }) + .map(|(service_name, listeners_vec)| { + (service_name.clone(), listeners_vec.clone()) + }) + .collect(); + + for (service_name, listeners_to_notify) in active_services { + if let Some(subscriber) = subscribers.get(&service_name) { + while let Some(sample) = subscriber.receive().ok().flatten() { + for listener in &listeners_to_notify { + let payload_bytes = sample.payload().as_slice(); + let mut new_umessage = UMessage::new(); + new_umessage.attributes = + MessageField::some(UAttributes::from(sample.user_header())); + new_umessage.payload = Some(payload_bytes.to_vec().into()); + + let listener_clone = listener.clone(); + tokio::spawn(async move { + listener_clone.on_receive(new_umessage).await; + }); + } + } + } + } + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }); + } + + fn handle_send( + publisher: &iceoryx2::port::publisher::Publisher< + ipc::Service, + FixedSizeVec, + CustomHeader, + >, + message: UMessage, + ) -> Result<(), UStatus> { + let payload_bytes = message.payload.clone().unwrap_or_default().to_vec(); + let mut payload_vec = FixedSizeVec::::new(); + assert!(payload_vec.extend_from_slice(&payload_bytes)); + let header = CustomHeader::from_message(&message)?; + + let sample = publisher.loan_uninit().map_err(|e| { + UStatus::fail_with_code(UCode::INTERNAL, &format!("Failed to loan sample: {e}")) + })?; + + let mut sample_final = sample.write_payload(payload_vec); + *sample_final.user_header_mut() = header; + + sample_final.send().map_err(|e| { + UStatus::fail_with_code(UCode::INTERNAL, &format!("Failed to send: {e}")) + })?; + + Ok(()) + } + + fn handle_register_listener( + node: &Node, + subscribers: &mut HashMap< + String, + iceoryx2::port::subscriber::Subscriber< + ipc::Service, + FixedSizeVec, + CustomHeader, + >, + >, + listeners: &mut HashMap>>, + source_filter: UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let service_name = Self::compute_listener_service_name(&source_filter, sink_filter)?; + + if !subscribers.contains_key(&service_name) { + let service_name_res: Result = service_name.as_str().try_into(); + let service = node + .service_builder(&service_name_res.map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + &format!("Invalid service name: {}", e), + ) + })?) + .publish_subscribe::>() + .user_header::() + .open_or_create() + .map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + &format!("Failed to create service: {}", e), + ) + })?; + + let subscriber = service.subscriber_builder().create().map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + &format!("Failed to create subscriber: {}", e), + ) + })?; + subscribers.insert(service_name.clone(), subscriber); + } + + listeners + .entry(service_name) + .or_insert_with(Vec::new) + .push(listener); + Ok(()) + } + + fn handle_unregister_listener( + subscribers: &mut HashMap< + String, + iceoryx2::port::subscriber::Subscriber< + ipc::Service, + FixedSizeVec, + CustomHeader, + >, + >, + listeners: &mut HashMap>>, + source_filter: UUri, + sink_filter: Option<&UUri>, + listener: &Arc, + ) -> Result<(), UStatus> { + let service_name = match Self::compute_listener_service_name(&source_filter, sink_filter) { + Ok(name) => name, + Err(e) => return Err(e), + }; + + if let Some(listener_vec) = listeners.get_mut(&service_name) { + listener_vec.retain(|l| !Arc::ptr_eq(l, listener)); + + if listener_vec.is_empty() { + listeners.remove(&service_name); + subscribers.remove(&service_name); + } + } + + Ok(()) + } +} + +#[async_trait] +impl UTransport for Iceoryx2Transport { + async fn send(&self, message: UMessage) -> Result<(), UStatus> { + let (tx, rx) = std::sync::mpsc::channel(); + + let command = TransportCommand::Send { + message, + response: tx, + }; + + self.command_sender + .send(command) + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Background task has died"))?; + + rx.recv().map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Background task response failed") + })? + } + + async fn register_listener( + &self, + source_filter: &UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let (tx, rx) = std::sync::mpsc::channel(); + + let command = TransportCommand::RegisterListener { + source_filter: source_filter.clone(), + sink_filter: sink_filter.cloned(), + listener, + response: tx, + }; + + self.command_sender + .send(command) + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Background task has died"))?; + + rx.recv().map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Background task response failed") + })? + } + + async fn unregister_listener( + &self, + source_filter: &UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let (tx, rx) = std::sync::mpsc::channel(); + + let command = TransportCommand::UnregisterListener { + source_filter: source_filter.clone(), + sink_filter: sink_filter.cloned(), + listener, + response: tx, + }; + + self.command_sender + .send(command) + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Background task has died"))?; + + rx.recv().map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Background task response failed") + })? + } } #[cfg(test)] mod tests { use super::*; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use std::time::Duration; + use up_rust::{MockUListener, UMessageBuilder, UPayloadFormat}; fn test_uri(authority: &str, instance: u16, typ: u16, version: u8, resource: u16) -> UUri { let entity_id = ((instance as u32) << 16) | (typ as u32); UUri::try_from_parts(authority, entity_id, version, resource).unwrap() } - // performing successful tests for service name computation + fn dummy_uuid() -> up_rust::UUID { + up_rust::UUID::build() + } #[test] - // [specitem,oft-sid="dsn~up-transport-iceoryx2-service-name~1",oft-needs="utest"] fn test_publish_service_name() { let source = test_uri("device1", 0x0000, 0x10AB, 0x03, 0x7FFF); - - let name = Iceoryx2Transport::compute_service_name(&source, None).unwrap(); + let name = Iceoryx2Transport::compute_service_name_from_uris(&source, None).unwrap(); assert_eq!(name, "up/device1/10AB/0/3/7FFF"); } #[test] - // [specitem,oft-sid="dsn~up-transport-iceoryx2-service-name~1",oft-needs="utest"] fn test_notification_service_name() { let source = test_uri("device1", 0x0000, 0x10AB, 0x03, 0x80CD); let sink = test_uri("device1", 0x0000, 0x30EF, 0x04, 0x0000); - let name = Iceoryx2Transport::compute_service_name(&source, Some(&sink)).unwrap(); + let name = Iceoryx2Transport::compute_service_name_from_uris(&source, Some(&sink)).unwrap(); assert_eq!(name, "up/device1/10AB/0/3/80CD/device1/30EF/0/4/0"); } #[test] - // [specitem,oft-sid="dsn~up-transport-iceoryx2-service-name~1",oft-needs="utest"] fn test_rpc_request_service_name() { let sink = test_uri("device1", 0x0004, 0x03AB, 0x03, 0x0000); - let reply_to = test_uri("device1", 0x0000, 0x00CD, 0x04, 0xB); - - let name = Iceoryx2Transport::compute_service_name(&sink, Some(&reply_to)).unwrap(); + let reply_to = test_uri("device1", 0x0000, 0x00CD, 0x04, 0x000B); + let name = + Iceoryx2Transport::compute_service_name_from_uris(&sink, Some(&reply_to)).unwrap(); assert_eq!(name, "up/device1/CD/0/4/B"); } #[test] - // [specitem,oft-sid="dsn~up-transport-iceoryx2-service-name~1",oft-needs="utest"] fn test_rpc_response_service_name() { - let source = test_uri("device1", 0x0000, 0x00CD, 0x04, 0xB); - let sink = test_uri("device1", 0x0004, 0x3AB, 0x3, 0x0000); - - let name = Iceoryx2Transport::compute_service_name(&source, Some(&sink)).unwrap(); + let source = test_uri("device1", 0x0000, 0x00CD, 0x04, 0x000B); + let sink = test_uri("device1", 0x0004, 0x03AB, 0x03, 0x0000); + let name = Iceoryx2Transport::compute_service_name_from_uris(&source, Some(&sink)).unwrap(); assert_eq!(name, "up/device1/CD/0/4/B/device1/3AB/4/3/0"); } - // performing failing tests for service name computation - - #[test] - // .specitem[dsn~up-attributes-request-source~1] - // .specitem[dsn~up-attributes-response-source~1] - // .specitem[dsn~up-attributes-notification-source~1] - fn test_missing_uri_error() { - let uuri = UUri::new(); - let result = Iceoryx2Transport::compute_service_name(&uuri, None); - - assert!(result.is_err()); - assert_eq!(result.unwrap_err().get_code(), UCode::INVALID_ARGUMENT); - } - #[test] - //both source and sink have resource ID equal to 0 - // .specitem[dsn~up-attributes-request-source~1] - // .specitem[dsn~up-attributes-request-sink~1] - // .specitem[dsn~up-attributes-response-source~1] - // .specitem[dsn~up-attributes-response-sink~1] fn test_fail_resource_id_error() { - let source = test_uri("device1", 0x0000, 0x00CD, 0x04, 0x000); - let sink = test_uri("device1", 0x0004, 0x3AB, 0x3, 0x0000); - let result = Iceoryx2Transport::compute_service_name(&source, Some(&sink)); + let source = test_uri("device1", 0x0000, 0x00CD, 0x04, 0x0000); + let sink = test_uri("device1", 0x0004, 0x03AB, 0x03, 0x0000); + let result = Iceoryx2Transport::compute_service_name_from_uris(&source, Some(&sink)); assert!(result.is_err_and(|err| err.get_code() == UCode::INVALID_ARGUMENT)); } #[test] - //source has resource id=0 but missing sink - // .specitem[dsn~up-attributes-request-sink~1] - // .specitem[dsn~up-attributes-request-source~1] fn test_fail_missing_sink_error() { - let source = test_uri("device1", 0x0000, 0x00CD, 0x04, 0x000); - let result = Iceoryx2Transport::compute_service_name(&source, None); + let source = test_uri("device1", 0x0000, 0x00CD, 0x04, 0x0000); + let result = Iceoryx2Transport::compute_service_name_from_uris(&source, None); assert!(result.is_err_and(|err| err.get_code() == UCode::INVALID_ARGUMENT)); } #[test] - //missing source URI - // .specitem[dsn~up-attributes-request-source~1] - // .specitem[dsn~up-attributes-response-source~1] - // .specitem[dsn~up-attributes-notification-source~1] fn test_fail_missing_source_error() { let uuri = UUri::new(); - let sink = test_uri("device1", 0x0004, 0x3AB, 0x3, 0x000); - let result = Iceoryx2Transport::compute_service_name(&uuri, Some(&sink)); + let sink = test_uri("device1", 0x0004, 0x03AB, 0x03, 0x0000); + let result = Iceoryx2Transport::compute_service_name_from_uris(&uuri, Some(&sink)); assert!(result.is_err_and(|err| err.get_code() == UCode::INVALID_ARGUMENT)); } + + #[tokio::test] + async fn test_register_listener_creates_subscriber() { + let transport = Iceoryx2Transport::new().unwrap(); + let uri = UUri::try_from_parts("vehicle", 0x123, 1, 0x456).unwrap(); + let listener = Arc::new(MockUListener::new()); + + let result = transport + .register_listener(&uri, None, listener.clone()) + .await; + assert!(result.is_ok(), "Listener registration should succeed"); + } + + #[tokio::test] + async fn test_register_duplicate_listeners() { + let transport = Iceoryx2Transport::new().unwrap(); + let uri = UUri::try_from_parts("vehicle", 0x123, 1, 0x456).unwrap(); + let listener1 = Arc::new(MockUListener::new()); + let listener2 = Arc::new(MockUListener::new()); + + let result1 = transport + .register_listener(&uri, None, listener1.clone()) + .await; + assert!( + result1.is_ok(), + "First listener registration should succeed" + ); + + let result2 = transport + .register_listener(&uri, None, listener2.clone()) + .await; + assert!( + result2.is_ok(), + "Second listener registration should succeed" + ); + } + + #[tokio::test] + async fn test_unregister_listener_cleanup() { + let transport = Iceoryx2Transport::new().unwrap(); + let uri = UUri::try_from_parts("vehicle", 0x123, 1, 0x456).unwrap(); + let listener = Arc::new(MockUListener::new()); + + transport + .register_listener(&uri, None, listener.clone()) + .await + .unwrap(); + + let result = transport + .unregister_listener(&uri, None, listener.clone()) + .await; + assert!(result.is_ok(), "Listener unregistration should succeed"); + } + + #[tokio::test] + async fn test_unregister_nonexistent_listener() { + let transport = Iceoryx2Transport::new().unwrap(); + let uri = UUri::try_from_parts("vehicle", 0x123, 1, 0x456).unwrap(); + let listener = Arc::new(MockUListener::new()); + + let result = transport + .unregister_listener(&uri, None, listener.clone()) + .await; + assert!( + result.is_ok(), + "Unregistering non-existent listener should succeed as no-op" + ); + } + + #[tokio::test] + async fn test_multiple_unregisters() { + let transport = Iceoryx2Transport::new().unwrap(); + let uri = UUri::try_from_parts("vehicle", 0x123, 1, 0x456).unwrap(); + let listener = Arc::new(MockUListener::new()); + + transport + .register_listener(&uri, None, listener.clone()) + .await + .unwrap(); + + let result1 = transport + .unregister_listener(&uri, None, listener.clone()) + .await; + assert!(result1.is_ok(), "First unregister should succeed"); + + let result2 = transport + .unregister_listener(&uri, None, listener.clone()) + .await; + assert!(result2.is_ok(), "Second unregister should succeed as no-op"); + } + + #[tokio::test] + async fn test_unregister_cycle() { + struct CountingListener { + count: AtomicUsize, + } + + #[async_trait::async_trait] + impl UListener for CountingListener { + async fn on_receive(&self, _msg: UMessage) { + self.count.fetch_add(1, Ordering::SeqCst); + } + } + + let transport = Iceoryx2Transport::new().unwrap(); + let uri = UUri::try_from_parts(&format!("vehicle{}", std::process::id()), 0x123, 1, 0x9000) + .unwrap(); + + let listener = Arc::new(CountingListener { + count: AtomicUsize::new(0), + }); + + transport + .register_listener(&uri, None, listener.clone()) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let message = UMessageBuilder::publish(uri.clone()).build().unwrap(); + transport.send(message.clone()).await.unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let count_before = listener.count.load(Ordering::SeqCst); + assert!(count_before >= 1); + + transport + .unregister_listener(&uri, None, listener.clone()) + .await + .unwrap(); + + for _ in 0..3 { + transport.send(message.clone()).await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + tokio::time::sleep(Duration::from_millis(200)).await; + + let count_after = listener.count.load(Ordering::SeqCst); + assert_eq!( + count_before, count_after, + "Should not receive messages after unregister" + ); + } } diff --git a/src/raw_bytes.rs b/src/raw_bytes.rs new file mode 100644 index 0000000..7960495 --- /dev/null +++ b/src/raw_bytes.rs @@ -0,0 +1,30 @@ +use iceoryx2::prelude::*; +use iceoryx2_bb_container::{byte_string::FixedSizeByteString, vec::FixedSizeVec}; +/// A minimal wrapper to hold raw byte payloads for Iceoryx2. +/// +/// Iceoryx2 requires payload types to implement ZeroCopySend. +/// This struct makes raw Vec compatible. +const MAX_RAW_BYTES_SIZE: usize = 4096; +#[derive(Debug, Clone, PartialEq, Default)] +#[repr(C)] +pub struct RawBytes { + pub data: FixedSizeVec, +} + +unsafe impl ZeroCopySend for RawBytes {} + +impl RawBytes { + /// Converts this RawBytes into a Vec. + pub fn to_bytes(&self) -> Vec { + self.data.as_slice().to_vec() + } + + /// Creates a RawBytes struct from a Vec. + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut data = FixedSizeVec::new(); + for &byte in bytes { + let _ok = data.push(byte); + } + RawBytes { data } + } +} diff --git a/src/receiver.rs b/src/receiver.rs new file mode 100644 index 0000000..b5892a1 --- /dev/null +++ b/src/receiver.rs @@ -0,0 +1,229 @@ +use super::*; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use log::info; +use tokio::sync::Notify; + +use env_logger; +use iceoryx2::prelude::*; +use std::str::FromStr; +use up_rust::{ + MockUListener, UAttributes, UCode, UListener, UMessage, UMessageBuilder, UPayloadFormat, + UStatus, UTransport, UUri, +}; + +use std::sync::Once; + +static INIT_LOGGER: Once = Once::new(); + +fn init_logger() { + INIT_LOGGER.call_once(|| { + env_logger::init(); + }); +} + +const MESSAGE_DATA: &str = "Hello World!"; + +pub struct Receiver { + expected: UMessage, + notify: Arc, +} + +impl Receiver { + pub fn new(expected: UMessage, notify: Arc) -> Self { + Self { expected, notify } + } +} + +#[async_trait] +impl UListener for Receiver { + async fn on_receive(&self, message: UMessage) -> () { + if let Some(payload) = &message.payload { + println!("Received Message ID: {:#?}", message.id()); + if let (Some(expected_payload), Some(actual_payload)) = + (&self.expected.payload, &message.payload) + { + assert_eq!(expected_payload, actual_payload); + } else { + panic!("Missing payloads in either expected or actual message"); + } + self.notify.notify_one(); + } + } +} + +async fn register_listener_and_send( + authority: &str, + umessage: UMessage, + source_filter: &UUri, + sink_filter: Option<&UUri>, +) -> Result<(), Box> { + init_logger(); + let source_uri = UUri::try_from_parts(authority, 0xABC, 1, 0)?; + let transport = Iceoryx2Transport::new().unwrap(); + let notify = Arc::new(Notify::new()); + let receiver = Arc::new(Receiver::new(umessage.clone(), notify.clone())); + + transport + .register_listener(source_filter, sink_filter, receiver) + .await + .unwrap(); + + // Send UMessage + info!( + "sending message: [id: {}, type: {}]", + umessage.id_unchecked().to_hyphenated_string(), + umessage.type_unchecked().to_cloudevent_type() + ); + transport.send(umessage).await?; + Ok( + tokio::time::timeout(Duration::from_secs(3), notify.notified()) + .await + .map_err(|_| { + UStatus::fail_with_code(UCode::DEADLINE_EXCEEDED, "did not receive message in time") + })?, + ) +} + +#[test_case::test_case("vehicle1", 12_000, "//vehicle1/10A10B/1/CA5D", "//vehicle1/10A10B/1/CA5D"; "specific source filter")] +#[tokio::test(flavor = "multi_thread")] +async fn test_publish_gets_to_listener( + authority: &str, + ttl: u32, + topic_uri: &str, + source_filter_uri: &str, +) -> Result<(), Box> { + let topic = UUri::from_str(topic_uri)?; + let source_filter = UUri::from_str(source_filter_uri)?; + + let payload = vec![1, 2, 3, 4, 5]; + + let umessage = UMessageBuilder::publish(topic.clone()) + .with_priority(up_rust::UPriority::UPRIORITY_CS5) + .with_traceparent("traceparent") + .with_ttl(ttl) + .build_with_payload(payload, UPayloadFormat::UPAYLOAD_FORMAT_RAW)?; + + register_listener_and_send(authority, umessage, &source_filter, None).await +} + +#[tokio::test] +async fn test_exact_listener_dispatch() -> Result<(), Box> { + let transport = Iceoryx2Transport::new().unwrap(); + let listener_uri = UUri::from_str("//vehicle1/10A10B/1/CA5D")?; + let matching_message = UMessageBuilder::publish(listener_uri.clone()) + .build_with_payload(vec![1, 2, 3], UPayloadFormat::UPAYLOAD_FORMAT_RAW)?; + let non_matching_message = + UMessageBuilder::publish(UUri::from_str("//vehicle2/10A10B/1/CA5D")?) + .build_with_payload(vec![4, 5, 6], UPayloadFormat::UPAYLOAD_FORMAT_RAW)?; + + let received_notify = Arc::new(Notify::new()); + let listener = Arc::new(Receiver::new( + matching_message.clone(), + received_notify.clone(), + )); + + transport + .register_listener(&listener_uri, None, listener) + .await?; + transport.send(non_matching_message).await?; + transport.send(matching_message).await?; + + tokio::time::timeout(Duration::from_secs(1), received_notify.notified()).await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_unregister_listener_stops_processing_of_messages() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + let transport = Iceoryx2Transport::new().unwrap(); + + struct TestListener { + hit_count: AtomicUsize, + } + + #[async_trait::async_trait] + impl UListener for TestListener { + async fn on_receive(&self, _msg: UMessage) { + let count = self.hit_count.fetch_add(1, Ordering::SeqCst); + println!("Received a message, count = {}", count); + } + } + + let listener = Arc::new(TestListener { + hit_count: AtomicUsize::new(0), + }); + + let uri = UUri::from_str(&format!("//vehicle{}/123/1/9000", std::process::id())).unwrap(); + let msg = UMessageBuilder::publish(uri.clone()) + .build() + .expect("failed to build"); + + // Registers listener + transport + .register_listener(&uri, None, listener.clone()) + .await + .unwrap(); + + // Let subscriber start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Send first message + transport.send(msg.clone()).await.unwrap(); + + // Wait for message processing + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify we received the first message + assert!( + listener.hit_count.load(Ordering::SeqCst) >= 1, + "Listener should have received at least one message" + ); + + // Record count before unregister + let count_before_unregister = listener.hit_count.load(Ordering::SeqCst); + + // Unregister + transport + .unregister_listener(&uri, None, listener.clone()) + .await + .unwrap(); + + // Wait for unregister to take effect + tokio::time::sleep(Duration::from_millis(200)).await; + + // Send more messages after unregister + for _ in 0..3 { + transport.send(msg.clone()).await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Wait a bit more to ensure no messages are processed + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify no additional messages were received after unregister + let count_after_unregister = listener.hit_count.load(Ordering::SeqCst); + assert_eq!( + count_before_unregister, count_after_unregister, + "Listener should not receive messages after unregister. Before: {}, After: {}", + count_before_unregister, count_after_unregister + ); +} + +#[tokio::test] +async fn test_mock_listener() { + use up_rust::MockUListener; + let mut listener = MockUListener::new(); + + listener.expect_on_receive().returning(|_message| { + println!("Mock listener called!"); + }); + + let message = UMessage::new(); + + listener.on_receive(message).await; +} diff --git a/src/transmission_data.rs b/src/transmission_data.rs new file mode 100644 index 0000000..d2e2c5e --- /dev/null +++ b/src/transmission_data.rs @@ -0,0 +1,64 @@ +// Copyright (c) 2023 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + + +/* +use iceoryx2::prelude::*; +use up_rust::{UCode, UMessage, UStatus}; + +#[derive(Debug, Clone, Copy, ZeroCopySend, Default)] +// optional type name; if not set, `core::any::type_name::()` is used +#[type_name("TransmissionData")] +#[repr(C)] +pub struct TransmissionData { + pub x: i32, + pub y: i32, + pub funky: f64, +} + +impl TransmissionData { + pub fn from_bytes(bytes: Vec) -> Result { + if bytes.len() != std::mem::size_of::() { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid byte length for TransmissionData", + )); + } + + let mut data = Self { + x: 0, + y: 0, + funky: 0.0, + }; + let ptr = &mut data as *mut Self as *mut u8; + unsafe { + std::ptr::copy_nonoverlapping(bytes.as_ptr(), ptr, bytes.len()); + } + Ok(data) + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = vec![0u8; std::mem::size_of::()]; + let ptr = self as *const Self as *const u8; + unsafe { + std::ptr::copy_nonoverlapping(ptr, bytes.as_mut_ptr(), bytes.len()); + } + bytes + } + + pub fn from_message(message: &UMessage) -> Result { + let payload = message.payload.clone().unwrap_or_default(); + Self::from_bytes(payload.to_vec()) + } +} + +*/ \ No newline at end of file