-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathnotification_receiver.rs
More file actions
79 lines (68 loc) · 2.84 KB
/
notification_receiver.rs
File metadata and controls
79 lines (68 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
/*!
This example illustrates how uProtocol's Transport Layer API can be used to receive
notifications sent by another uEntity using the Zenoh transport.
This example works in conjunction with the `notifier`, which should be started in
another terminal after having started this receiver.
*/
mod common;
use async_trait::async_trait;
use std::{str::FromStr, sync::Arc};
use up_rust::{LocalUriProvider, StaticUriProvider, UListener, UMessage, UTransport, UUri};
use up_transport_zenoh::UPTransportZenoh;
struct NotificationListener(tokio::runtime::Runtime);
#[async_trait]
impl UListener for NotificationListener {
async fn on_receive(&self, msg: UMessage) {
// Offload processing of the message to a dedicated tokio runtime using
// threads not used by Zenoh.
self.0.spawn(async move {
let payload = msg.payload.unwrap();
let value = String::from_utf8(payload.to_vec()).unwrap();
let uri = msg.attributes.unwrap().source.unwrap().to_uri(false);
println!("Received notification [source: {uri}, payload: {value}]");
});
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initiate logging
UPTransportZenoh::try_init_log_from_env();
println!("uProtocol notification receiver example");
let uri_provider = StaticUriProvider::new("receiver", 0x10_ab10, 1);
let transport = UPTransportZenoh::builder(uri_provider.get_authority())
.expect("invalid authority name")
.with_config(common::get_zenoh_config())
.build()
.await?;
let source_filter = UUri::from_str("//*/FFFFA1B2/1/8001")?;
let sink_filter = uri_provider.get_source_uri();
println!(
"Registering notification listener [source filter: {}, sink filter: {}]",
source_filter.to_uri(false),
sink_filter.to_uri(false)
);
let message_processing_rt = tokio::runtime::Builder::new_multi_thread()
.thread_name("message-processing")
.worker_threads(1)
.build()?;
transport
.register_listener(
&source_filter,
Some(&sink_filter),
Arc::new(NotificationListener(message_processing_rt)),
)
.await?;
tokio::signal::ctrl_c().await.map_err(Box::from)
}