Skip to content

Commit a8492e5

Browse files
committed
fixed tokio runtime issue and added publisher example
added publisher example !!! now there is a segmentation fault when the subscriber receives a published message. yay, a new error
1 parent 94f0645 commit a8492e5

10 files changed

Lines changed: 160 additions & 50 deletions

File tree

.vscode/launch.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,24 @@
3737
},
3838
"args": [],
3939
"cwd": "${workspaceFolder}"
40+
},
41+
{
42+
"type": "lldb",
43+
"request": "launch",
44+
"name": "Debug example 'publisher'",
45+
"cargo": {
46+
"args": [
47+
"build",
48+
"--example=publisher",
49+
"--package=up-transport-iceoryx2-rust"
50+
],
51+
"filter": {
52+
"name": "publisher",
53+
"kind": "example"
54+
}
55+
},
56+
"args": [],
57+
"cwd": "${workspaceFolder}"
4058
}
4159
]
4260
}

examples/common/constants.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// ################################################################################
2+
// Copyright (c) 2025 Contributors to the Eclipse Foundation
3+
//
4+
// See the NOTICE file(s) distributed with this work for additional
5+
// information regarding copyright ownership.
6+
//
7+
// This program and the accompanying materials are made available under the
8+
// terms of the Apache License Version 2.0 which is available at
9+
// https: //www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
// ################################################################################
13+
14+
use std::time::Duration;
15+
16+
/// The source filter publisher examples will use
17+
pub static SOURCE_FILTER_STR: &str = "up://device1/10AB/3/80CD";
18+
19+
/// A UMessage will be published at this frequency
20+
#[allow(dead_code)]
21+
pub static CYCLE_TIME: Duration = Duration::from_secs(1);

examples/common/helpers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use up_rust::UMessage;
1515

1616
/// Simply prints the [`UMessage`] instances source uri, sink uri, and payload to STDOUT
17+
#[allow(dead_code)]
1718
pub fn print_umessage(msg: &UMessage) {
1819
let payload_utf8 = msg.payload.as_ref().map(|p| String::from_utf8_lossy(p));
1920
let (source_uri, sink_uri) = get_source_and_sink_uri(msg);
@@ -23,7 +24,7 @@ pub fn print_umessage(msg: &UMessage) {
2324
println!("Payload: {payload_utf8:?}");
2425
}
2526

26-
pub fn get_source_and_sink_uri(msg: &UMessage) -> (Option<String>, Option<String>) {
27+
fn get_source_and_sink_uri(msg: &UMessage) -> (Option<String>, Option<String>) {
2728
let source_uri = msg
2829
.attributes
2930
.as_ref()

examples/common/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,20 @@
1+
// ################################################################################
2+
// Copyright (c) 2025 Contributors to the Eclipse Foundation
3+
//
4+
// See the NOTICE file(s) distributed with this work for additional
5+
// information regarding copyright ownership.
6+
//
7+
// This program and the accompanying materials are made available under the
8+
// terms of the Apache License Version 2.0 which is available at
9+
// https: //www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
// ################################################################################
13+
14+
pub mod constants;
115
pub mod helpers;
16+
17+
#[allow(unused_imports)]
18+
pub use constants::*;
19+
#[allow(unused_imports)]
20+
pub use helpers::*;

examples/publisher.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// ################################################################################
2+
// Copyright (c) 2025 Contributors to the Eclipse Foundation
3+
//
4+
// See the NOTICE file(s) distributed with this work for additional
5+
// information regarding copyright ownership.
6+
//
7+
// This program and the accompanying materials are made available under the
8+
// terms of the Apache License Version 2.0 which is available at
9+
// https: //www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
// ################################################################################
13+
14+
use std::{error::Error, str::FromStr};
15+
use up_rust::{UMessage, UMessageBuilder, UPayloadFormat, UTransport, UUri};
16+
use up_transport_iceoryx2_rust::{MessagingPattern, transport::UTransportIceoryx2};
17+
18+
mod common;
19+
use crate::common::*;
20+
21+
fn create_umessage(source_filter: &UUri, counter: u64) -> Result<UMessage, Box<dyn Error>> {
22+
let payload = format!("Hello, Iceoryx2! Message {counter}");
23+
let umessage = UMessageBuilder::publish(source_filter.clone())
24+
.build_with_payload(payload.into_bytes(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
25+
Ok(umessage)
26+
}
27+
28+
#[tokio::main(flavor = "multi_thread")]
29+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
30+
println!("uProtocol UTransportIceoryx2 publisher example");
31+
let source_filter = UUri::from_str(SOURCE_FILTER_STR).expect("Failed to create source UUri");
32+
let transport = UTransportIceoryx2::build(MessagingPattern::PublishSubscribe)?;
33+
let mut counter: u64 = 0;
34+
let cycle_time_str = format!("{CYCLE_TIME:?}");
35+
println!(
36+
"Publishing a UMessage with an incrementing counter every '{cycle_time_str}' second with source filter '{SOURCE_FILTER_STR}'"
37+
);
38+
loop {
39+
counter += 1;
40+
let umessage = create_umessage(&source_filter, counter)?;
41+
let memory_address = &umessage;
42+
println!(
43+
"Publishing message with source filter '{SOURCE_FILTER_STR}' and memory address {memory_address:p}"
44+
);
45+
transport.send(umessage).await?;
46+
tokio::time::sleep(CYCLE_TIME).await;
47+
}
48+
}

examples/subscriber.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,36 +13,34 @@
1313

1414
use async_trait::async_trait;
1515
use std::{str::FromStr, sync::Arc};
16-
use tracing::info;
1716
use up_rust::{UListener, UMessage, UTransport, UUri};
1817
use up_transport_iceoryx2_rust::{MessagingPattern, transport::UTransportIceoryx2};
1918

20-
use crate::common::helpers::*;
21-
2219
mod common;
20+
use crate::common::*;
2321

24-
struct SubscriberListener(tokio::runtime::Runtime);
22+
struct SubscriberListener;
2523

2624
#[async_trait]
2725
impl UListener for SubscriberListener {
2826
/// Spawns a task to process the received message. In this example, we simply print the message contents.
2927
async fn on_receive(&self, msg: UMessage) {
30-
self.0.spawn(async move {
31-
print_umessage(&msg);
32-
});
28+
print_umessage(&msg);
3329
}
3430
}
3531

3632
#[tokio::main(flavor = "multi_thread")]
3733
async fn main() -> Result<(), Box<dyn std::error::Error>> {
38-
info!("uProtocols UTransportIceoryx2 subscriber example");
39-
let source_filter =
40-
UUri::from_str("up://device1/10AB/3/80CD").expect("Failed to create source UUri");
34+
println!("uProtocol UTransportIceoryx2 subscriber example");
35+
let source_filter = UUri::from_str(SOURCE_FILTER_STR).expect("Failed to create source UUri");
4136
let transport = UTransportIceoryx2::build(MessagingPattern::PublishSubscribe)?;
42-
let ulistener = Arc::new(SubscriberListener(tokio::runtime::Runtime::new()?));
37+
let ulistener = Arc::new(SubscriberListener);
4338
transport
4439
.register_listener(&source_filter, None, ulistener)
4540
.await?;
46-
info!("Listener registered. Waiting for messages...");
41+
println!(
42+
"Listening to message from source filter '{SOURCE_FILTER_STR}'. Press CTRL+C to kill this subscriber"
43+
);
44+
println!("Waiting for messages...");
4745
tokio::signal::ctrl_c().await.map_err(Box::from)
4846
}

src/service_name_mapping.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn determine_message_type(
5555
}
5656

5757
fn is_a_publish(source: &UUri, messaging_pattern: MessagingPattern) -> bool {
58-
source.is_empty() == false && messaging_pattern == MessagingPattern::PublishSubscribe
58+
!source.is_empty() && messaging_pattern == MessagingPattern::PublishSubscribe
5959
}
6060

6161
pub fn compute_service_name(

src/transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl UTransportIceoryx2 {
3131
}
3232
}
3333

34-
fn build_publish_subscribe() -> Arc<impl UTransport> {
34+
fn build_publish_subscribe() -> Arc<Iceoryx2PubSub> {
3535
Iceoryx2PubSub::new()
3636
}
3737
}

src/utransport_pubsub.rs

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::{
3232

3333
pub struct Iceoryx2PubSub {
3434
node: Node<ipc_threadsafe::Service>,
35-
listener_worker_runtime: tokio::runtime::Runtime,
3635
pub publishers: PublisherSet<ipc_threadsafe::Service>,
3736
pub subscribers: SubscriberSet<ipc_threadsafe::Service>,
3837
pub listeners: ListenerMap,
@@ -43,28 +42,22 @@ impl Iceoryx2PubSub {
4342
let node = NodeBuilder::new()
4443
.create::<ipc_threadsafe::Service>()
4544
.expect("Failed to create Iceoryx2 Node");
46-
let listener_worker_runtime =
47-
tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
4845
let transport = Arc::new(Self {
4946
node,
5047
publishers: RwLock::new(HashMap::new()),
5148
subscribers: RwLock::new(HashMap::new()),
5249
listeners: RwLock::new(HashMap::new()),
53-
listener_worker_runtime,
5450
});
55-
Iceoryx2WorkerDispatcher::start_listener_worker(
56-
&transport.listener_worker_runtime,
57-
transport.clone(),
58-
);
51+
Iceoryx2WorkerDispatcher::start_listener_worker(transport.clone());
5952
transport
6053
}
6154

6255
pub fn create_subscriber(
6356
&self,
57+
service_name: ServiceName,
6458
) -> Result<Subscriber<ipc_threadsafe::Service, UMessageZeroCopy, UProtocolHeader>, UStatus>
6559
{
6660
// Placeholder implementation
67-
let service_name: ServiceName = "example_service".try_into().unwrap();
6861
let service = self
6962
.node
7063
.service_builder(&service_name)
@@ -85,15 +78,21 @@ impl Iceoryx2PubSub {
8578
service_name: ServiceName,
8679
) -> Result<Arc<Publisher<ipc_threadsafe::Service, UMessageZeroCopy, UProtocolHeader>>, UStatus>
8780
{
88-
let publishers = self.publishers.read().await;
89-
if publishers.contains_key(&service_name) {
90-
let publisher = publishers.get(&service_name).unwrap();
91-
return Ok(publisher.clone());
81+
let publisher = self.get_publisher(service_name.clone()).await;
82+
if let Some(publisher) = publisher {
83+
return Ok(publisher);
9284
}
93-
let service_name_res: Result<ServiceName, _> = service_name.as_str().try_into();
85+
self.create_publisher(service_name).await
86+
}
87+
88+
async fn create_publisher(
89+
&self,
90+
service_name: ServiceName,
91+
) -> Result<Arc<Publisher<ipc_threadsafe::Service, UMessageZeroCopy, UProtocolHeader>>, UStatus>
92+
{
9493
let service = self
9594
.node
96-
.service_builder(&service_name_res.unwrap())
95+
.service_builder(&service_name)
9796
.publish_subscribe::<UMessageZeroCopy>()
9897
.user_header::<UProtocolHeader>()
9998
.open_or_create()
@@ -110,25 +109,30 @@ impl Iceoryx2PubSub {
110109
Ok(publisher.clone())
111110
}
112111

112+
async fn get_publisher(
113+
&self,
114+
service_name: ServiceName,
115+
) -> Option<Arc<Publisher<ipc_threadsafe::Service, UMessageZeroCopy, UProtocolHeader>>> {
116+
let publishers = self.publishers.read().await;
117+
if publishers.contains_key(&service_name) {
118+
let publisher = publishers.get(&service_name).unwrap();
119+
return Some(publisher.clone());
120+
}
121+
None
122+
}
123+
113124
pub async fn relay(&self) -> Result<(), UStatus> {
114-
let current_thread_runtime = tokio::runtime::Builder::new_current_thread()
115-
.enable_all()
116-
.build()
117-
.map_err(|e| {
118-
UStatus::fail_with_code(
119-
UCode::INTERNAL,
120-
format!("Failed to build current_thread runtime: {e}"),
121-
)
122-
})?;
123-
for (service_name, subscriber) in self.subscribers.read().await.iter() {
125+
let subscribers = self.subscribers.read().await;
126+
for (service_name, subscriber) in subscribers.iter() {
124127
match subscriber.receive() {
125128
Ok(Some(sample)) => {
126129
let payload = sample;
127130
if let Some(listeners_to_notify) = self.listeners.read().await.get(service_name)
128131
{
129132
for listener in listeners_to_notify.iter() {
130133
let listener: &ComparableListener = listener;
131-
current_thread_runtime.block_on(listener.on_receive(payload.0.clone()));
134+
let payload_clone = payload.0.clone();
135+
listener.on_receive(payload_clone).await;
132136
}
133137
}
134138
}
@@ -182,14 +186,17 @@ impl UTransport for Iceoryx2PubSub {
182186
) -> Result<(), UStatus> {
183187
up_rust::verify_filter_criteria(source_filter, sink_filter)?;
184188
let service_name = compute_service_name(
185-
&source_filter,
189+
source_filter,
186190
sink_filter,
187191
MessagingPattern::PublishSubscribe,
188192
)?;
189-
let subscribers = self.subscribers.read().await;
193+
let has_subscriber = {
194+
let subscribers = self.subscribers.read().await;
195+
subscribers.contains_key(&service_name)
196+
};
190197
// insert subscriber for service name if it does not already exist
191-
if !subscribers.contains_key(&service_name) {
192-
let subscriber = self.create_subscriber()?;
198+
if !has_subscriber {
199+
let subscriber = self.create_subscriber(service_name.clone())?;
193200
let mut subscribers = self.subscribers.write().await;
194201
subscribers.insert(service_name.clone(), Arc::new(subscriber));
195202
}
@@ -212,7 +219,7 @@ impl UTransport for Iceoryx2PubSub {
212219
) -> Result<(), UStatus> {
213220
up_rust::verify_filter_criteria(source_filter, sink_filter)?;
214221
let service_name = compute_service_name(
215-
&source_filter,
222+
source_filter,
216223
sink_filter,
217224
MessagingPattern::PublishSubscribe,
218225
)?;

src/workers/dispatcher.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,16 @@
1212
// ################################################################################
1313

1414
use std::sync::{Arc, atomic::Ordering};
15-
use tokio::runtime::Runtime;
1615
use up_rust::UStatus;
1716

1817
use crate::{utransport_pubsub::Iceoryx2PubSub, workers::worker::Iceoryx2Worker};
1918

2019
pub struct Iceoryx2WorkerDispatcher {}
2120

2221
impl Iceoryx2WorkerDispatcher {
23-
pub fn start_listener_worker(runtime: &Runtime, transport: Arc<Iceoryx2PubSub>) {
22+
pub fn start_listener_worker(transport: Arc<Iceoryx2PubSub>) {
2423
let worker = Iceoryx2Worker::new(transport.clone());
25-
let future = Iceoryx2WorkerDispatcher::run(worker);
26-
runtime.spawn(future);
24+
tokio::spawn(async { Iceoryx2WorkerDispatcher::run(worker).await });
2725
}
2826

2927
async fn run(worker: Iceoryx2Worker) -> Result<(), UStatus> {

0 commit comments

Comments
 (0)