Skip to content

Commit 3d5f6a0

Browse files
authored
Do not implement LocalUriProvider (#123)
* [#122] Do not implement LocalUriProvider UPZenohTransport has been changed to no longer implement the LocalUriProvider trait. This seems reasonable to allow for the same transport instance to be used by multiple service implementations having different entity types. * Incorporate feedback
1 parent 2883514 commit 3d5f6a0

19 files changed

Lines changed: 353 additions & 1122 deletions

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ chrono = "0.4.41"
4646
clap = { version = "4.5.40", features = ["derive"] }
4747
serde_json = "1.0.128"
4848
test-case = { version = "3.3" }
49+
tokio = { version = "1.45.1", default-features = false, features = ["signal"] }
50+
tracing-subscriber = "0.3.19"
4951
up-rust = { version = "0.6.0", features = ["communication", "test-util"] }
5052

5153
[features]

examples/l2_rpc_client.rs

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ mod common;
2424
use std::{str::FromStr, sync::Arc};
2525
use up_rust::{
2626
communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload},
27-
LocalUriProvider, UPayloadFormat, UPriority, UUri, UUID,
27+
LocalUriProvider, StaticUriProvider, UPayloadFormat, UPriority, UUri, UUID,
2828
};
2929
use up_transport_zenoh::UPTransportZenoh;
3030

@@ -34,46 +34,47 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3434
UPTransportZenoh::try_init_log_from_env();
3535

3636
println!("uProtocol RPC client example");
37-
let zenoh_transport = UPTransportZenoh::builder("//rpc_client/1/1/0")
38-
.expect("invalid URI")
37+
let uri_provider = Arc::new(StaticUriProvider::new("l2_rpc_client", 0x10_ab10, 1));
38+
let transport = UPTransportZenoh::builder(uri_provider.get_authority())
39+
.expect("invalid authority name")
3940
.with_config(common::get_zenoh_config())
4041
.build()
4142
.await
4243
.map(Arc::new)?;
43-
44-
let rpc_client = InMemoryRpcClient::new(zenoh_transport.clone(), zenoh_transport.clone())
44+
let rpc_client = InMemoryRpcClient::new(transport, uri_provider.clone())
4545
.await
4646
.map(Arc::new)?;
4747

48-
let sink_uuri = UUri::from_str("//rpc_server/1/1/1")?;
48+
let operation_uuri = UUri::from_str("//rpc_server/AAA/1/6A10")?;
4949

50-
// create uPayload and send request
51-
let data = String::from("GetCurrentTime");
52-
let payload = UPayload::new(data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT);
50+
// create and send request
51+
let payload = UPayload::new("GetCurrentTime", UPayloadFormat::UPAYLOAD_FORMAT_TEXT);
5352
let call_options = CallOptions::for_rpc_request(
5453
5_000,
5554
Some(UUID::build()),
5655
Some("my_token".to_string()),
5756
Some(UPriority::UPRIORITY_CS6),
5857
);
5958
println!(
60-
"Sending request from {} to {}",
61-
zenoh_transport.get_source_uri(),
62-
sink_uuri
59+
"Sending request [source: {}, sink: {}]",
60+
uri_provider.get_source_uri().to_uri(false),
61+
operation_uuri.to_uri(false)
6362
);
63+
6464
match rpc_client
65-
.invoke_method(sink_uuri, call_options, Some(payload))
65+
.invoke_method(operation_uuri, call_options, Some(payload))
6666
.await
6767
{
68-
Ok(result) => {
69-
let payload = result.unwrap().payload();
70-
let value = payload.into_iter().map(|c| c as char).collect::<String>();
71-
println!("Receive {value}");
72-
Ok(())
68+
Err(_) => {
69+
println!("Failed to receive reply from service");
70+
}
71+
Ok(Some(payload)) => {
72+
let value = String::from_utf8(payload.payload().to_vec())?;
73+
println!("Received reply [payload: {value}]");
7374
}
74-
Err(e) => {
75-
println!("Failed to receive the reply");
76-
Err(Box::from(e))
75+
_ => {
76+
println!("Reply did not contain payload");
7777
}
7878
}
79+
Ok(())
7980
}

examples/notification_receiver.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,49 +10,61 @@
1010
*
1111
* SPDX-License-Identifier: Apache-2.0
1212
********************************************************************************/
13+
14+
/*!
15+
This example illustrates how uProtocol's Transport Layer API can be used to receive
16+
notifications sent by another uEntity using the Zenoh transport.
17+
18+
This example works in conjunction with the `notifier`, which should be started in
19+
another terminal after having started this receiver.
20+
*/
21+
1322
mod common;
1423

1524
use async_trait::async_trait;
1625
use std::{str::FromStr, sync::Arc};
17-
use tokio::time::{sleep, Duration};
18-
use up_rust::{LocalUriProvider, UListener, UMessage, UTransport, UUri};
26+
use up_rust::{LocalUriProvider, StaticUriProvider, UListener, UMessage, UTransport, UUri};
1927
use up_transport_zenoh::UPTransportZenoh;
2028

2129
struct SubscriberListener;
2230
#[async_trait]
2331
impl UListener for SubscriberListener {
2432
async fn on_receive(&self, msg: UMessage) {
2533
let payload = msg.payload.unwrap();
26-
let value = payload.into_iter().map(|c| c as char).collect::<String>();
27-
let uri = msg.attributes.unwrap().source.unwrap().to_string();
28-
println!("Receiving notification '{value}' from {uri}");
34+
let value = String::from_utf8(payload.to_vec()).unwrap();
35+
let uri = msg.attributes.unwrap().source.unwrap().to_uri(false);
36+
println!("Received notification [source: {uri}, payload: {value}]");
2937
}
3038
}
3139

3240
#[tokio::main]
3341
async fn main() -> Result<(), Box<dyn std::error::Error>> {
3442
// initiate logging
3543
UPTransportZenoh::try_init_log_from_env();
44+
3645
println!("uProtocol notification receiver example");
37-
let receiver = UPTransportZenoh::builder("//receiver/2/1/0")
38-
.expect("invalid URI")
46+
let uri_provider = StaticUriProvider::new("receiver", 0x10_ab10, 1);
47+
let transport = UPTransportZenoh::builder(uri_provider.get_authority())
48+
.expect("invalid authority name")
3949
.with_config(common::get_zenoh_config())
4050
.build()
4151
.await?;
4252

43-
// create uuri
44-
let source_uuri = UUri::from_str("//notification/1/1/8001")?;
53+
let source_filter = UUri::from_str("//*/FFFFA1B2/1/8001")?;
54+
let sink_filter = uri_provider.get_source_uri();
4555

46-
println!("Register the listener...");
47-
receiver
56+
println!(
57+
"Registering notification listener [source filter: {}, sink filter: {}]",
58+
source_filter.to_uri(false),
59+
sink_filter.to_uri(false)
60+
);
61+
transport
4862
.register_listener(
49-
&source_uuri,
50-
Some(&receiver.get_source_uri()),
63+
&source_filter,
64+
Some(&sink_filter),
5165
Arc::new(SubscriberListener {}),
5266
)
5367
.await?;
5468

55-
loop {
56-
sleep(Duration::from_millis(1000)).await;
57-
}
69+
tokio::signal::ctrl_c().await.map_err(Box::from)
5870
}

examples/notifier.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,22 @@
1010
*
1111
* SPDX-License-Identifier: Apache-2.0
1212
********************************************************************************/
13+
14+
/*!
15+
This example illustrates how uProtocol's Transport Layer API can be used to send
16+
notifications to another uEntity using the Zenoh transport.
17+
18+
This example works in conjunction with the `notification_receiver`, which should
19+
be started in another terminal first.
20+
*/
21+
1322
mod common;
1423

1524
use std::str::FromStr;
1625

17-
use up_rust::{LocalUriProvider, UMessageBuilder, UPayloadFormat, UTransport, UUri};
26+
use up_rust::{
27+
LocalUriProvider, StaticUriProvider, UMessageBuilder, UPayloadFormat, UTransport, UUri,
28+
};
1829
use up_transport_zenoh::UPTransportZenoh;
1930

2031
#[tokio::main]
@@ -23,19 +34,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2334
UPTransportZenoh::try_init_log_from_env();
2435

2536
println!("uProtocol notifier example");
26-
let notifier = UPTransportZenoh::builder("//notification/1/1/0")
27-
.expect("invalid URI")
37+
let uri_provider = StaticUriProvider::new("notification", 0xa1b2, 1);
38+
let transport = UPTransportZenoh::builder(uri_provider.get_authority())
39+
.expect("invalid authority name")
2840
.with_config(common::get_zenoh_config())
2941
.build()
3042
.await?;
3143

3244
// create uuri
33-
let sink_uuri = UUri::from_str("//receiver/2/1/0")?;
45+
let source_uuri = uri_provider.get_resource_uri(0x8001);
46+
let sink_uuri = UUri::from_str("//receiver/10AB10/1/0")?;
3447

35-
let data = "The notification data";
36-
let umessage =
37-
UMessageBuilder::notification(notifier.get_resource_uri(0x8001), sink_uuri.clone())
48+
for cnt in 1..=100 {
49+
let data = format!("notification {cnt}");
50+
println!(
51+
"Sending notification [from: {}, to: {}, payload: {data}]",
52+
&source_uuri.to_uri(false),
53+
&sink_uuri.to_uri(false)
54+
);
55+
let umessage = UMessageBuilder::notification(source_uuri.clone(), sink_uuri.clone())
3856
.build_with_payload(data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
39-
println!("Sending notification '{data}' to {sink_uuri}...");
40-
notifier.send(umessage).await.map_err(Box::from)
57+
transport.send(umessage).await?;
58+
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
59+
}
60+
Ok(())
4161
}

examples/publisher.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@
1010
*
1111
* SPDX-License-Identifier: Apache-2.0
1212
********************************************************************************/
13+
14+
/*!
15+
This example illustrates how uProtocol's Transport Layer API can be used to publish
16+
messages to a topic using the Zenoh transport.
17+
18+
This example works in conjunction with the `subscriber`, which should be started in
19+
another terminal first.
20+
*/
21+
1322
mod common;
1423

15-
use tokio::time::{sleep, Duration};
16-
use up_rust::{LocalUriProvider, UMessageBuilder, UPayloadFormat, UTransport};
24+
use up_rust::{LocalUriProvider, StaticUriProvider, UMessageBuilder, UPayloadFormat, UTransport};
1725
use up_transport_zenoh::UPTransportZenoh;
1826

1927
#[tokio::main]
@@ -22,24 +30,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2230
UPTransportZenoh::try_init_log_from_env();
2331

2432
println!("uProtocol publisher example");
25-
let publisher = UPTransportZenoh::builder("//publisher/1/1/0")
26-
.expect("invalid URI")
33+
let uri_provider = StaticUriProvider::new("publisher", 0x3_b1da, 1);
34+
let transport = UPTransportZenoh::builder(uri_provider.get_authority())
35+
.expect("invalid authority name")
2736
.with_config(common::get_zenoh_config())
2837
.build()
2938
.await?;
3039

31-
// create uuri
32-
let uuri = publisher.get_resource_uri(0x8001);
33-
34-
let mut cnt: u64 = 0;
35-
loop {
36-
let data = format!("{cnt}");
37-
let umessage = UMessageBuilder::publish(uuri.clone())
38-
.build_with_payload(data.clone(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
39-
.unwrap();
40-
println!("Publishing {data} from {uuri}...");
41-
publisher.send(umessage).await.unwrap();
42-
sleep(Duration::from_millis(1000)).await;
43-
cnt += 1;
40+
// create topic uuri
41+
let topic = uri_provider.get_resource_uri(0x8001);
42+
43+
for cnt in 1..=100 {
44+
let data = format!("event {cnt}");
45+
println!(
46+
"Publishing message [topic: {}, payload: {data}]",
47+
topic.to_uri(false)
48+
);
49+
let umessage = UMessageBuilder::publish(topic.clone())
50+
.build_with_payload(data, UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
51+
transport.send(umessage).await?;
52+
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
4453
}
54+
Ok(())
4555
}

0 commit comments

Comments
 (0)