Skip to content

Commit c9d53f3

Browse files
authored
[#13] Use any up-rust upstreamed code
* Remove transport handle, collapse down to lib and transport * Rearrange transport engine * Implement LocalUriProvider * Make use of new up_rust::UUri::try_from_parts(), up_rust::UUri::any() * Show usage of uP-L2 APIs in hello_client and hello_service examples * Fix logic of Response handling to mean no commstatus is OK Implements [#6], [#8]
1 parent acbb0d0 commit c9d53f3

18 files changed

Lines changed: 1082 additions & 1276 deletions

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ regex = { version = "1.10" }
5050
serde = { version = "1.0", features = ["derive"] }
5151
serde_json = { version = "1.0" }
5252
tokio = { version = "1.35.1", features = ["rt", "rt-multi-thread", "macros", "sync", "time", "tracing"] }
53-
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "3a50104421a801d52e1d9c68979db54c013ce43d" }
53+
up-rust = { version = "0.1.5" }
5454
vsomeip-proc-macro = { path = "vsomeip-proc-macro" }
5555
vsomeip-sys = { path = "vsomeip-sys", default-features = false }
5656

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,25 @@
1-
use async_trait::async_trait;
1+
/********************************************************************************
2+
* Copyright (c) 2024 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Apache License Version 2.0 which is available at
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* SPDX-License-Identifier: Apache-2.0
12+
********************************************************************************/
13+
214
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
315
use log::trace;
416
use std::fs::canonicalize;
517
use std::path::PathBuf;
618
use std::sync::Arc;
719
use std::time::Duration;
20+
use up_rust::communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload};
821
use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY;
9-
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri};
22+
use up_rust::{UStatus, UUri};
1023
use up_transport_vsomeip::UPTransportVsomeip;
1124

1225
const HELLO_SERVICE_ID: u16 = 0x6000;
@@ -24,38 +37,12 @@ const HELLO_SERVICE_UE_VERSION_MAJOR: u8 = HELLO_SERVICE_MAJOR;
2437
const HELLO_SERVICE_RESOURCE_ID: u16 = HELLO_METHOD_ID;
2538

2639
const CLIENT_AUTHORITY: &str = "me_authority";
27-
const CLIENT_UE_ID: u16 = 0x5678;
40+
const CLIENT_UE_ID: u32 = 0x5678;
2841
const CLIENT_UE_VERSION_MAJOR: u8 = 1;
2942
const CLIENT_RESOURCE_ID: u16 = 0;
3043

3144
const REQUEST_TTL: u32 = 1000;
3245

33-
struct ServiceResponseListener;
34-
35-
#[async_trait]
36-
impl UListener for ServiceResponseListener {
37-
async fn on_receive(&self, msg: UMessage) {
38-
println!("ServiceResponseListener: Received a message: {msg:?}");
39-
40-
let mut msg = msg.clone();
41-
42-
if let Some(ref mut attributes) = msg.attributes.as_mut() {
43-
attributes.payload_format =
44-
::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
45-
}
46-
47-
let Ok(hello_response) = msg.extract_protobuf_payload::<HelloResponse>() else {
48-
panic!("Unable to parse into HelloResponse");
49-
};
50-
51-
println!("Here we received response: {hello_response:?}");
52-
}
53-
54-
async fn on_error(&self, err: UStatus) {
55-
println!("ServiceResponseListener: Encountered an error: {err:?}");
56-
}
57-
}
58-
5946
#[tokio::main]
6047
async fn main() -> Result<(), UStatus> {
6148
env_logger::init();
@@ -70,36 +57,34 @@ async fn main() -> Result<(), UStatus> {
7057

7158
// There will be a single vsomeip_transport, as there is a connection into device and a streamer
7259
// TODO: Add error handling if we fail to create a UPTransportVsomeip
73-
let client: Arc<dyn UTransport> = Arc::new(
60+
let client_uuri = UUri::try_from_parts(
61+
CLIENT_AUTHORITY,
62+
CLIENT_UE_ID,
63+
CLIENT_UE_VERSION_MAJOR,
64+
CLIENT_RESOURCE_ID,
65+
)
66+
.unwrap();
67+
let client = Arc::new(
7468
UPTransportVsomeip::new_with_config(
75-
&CLIENT_AUTHORITY.to_string(),
69+
client_uuri,
7670
&HELLO_SERVICE_AUTHORITY.to_string(),
77-
CLIENT_UE_ID,
7871
&vsomeip_config.unwrap(),
7972
None,
8073
)
8174
.unwrap(),
8275
);
8376

84-
let source = UUri {
85-
authority_name: CLIENT_AUTHORITY.to_string(),
86-
ue_id: CLIENT_UE_ID as u32,
87-
ue_version_major: CLIENT_UE_VERSION_MAJOR as u32,
88-
resource_id: CLIENT_RESOURCE_ID as u32,
89-
..Default::default()
90-
};
91-
let sink = UUri {
92-
authority_name: HELLO_SERVICE_AUTHORITY.to_string(),
93-
ue_id: HELLO_SERVICE_UE_ID,
94-
ue_version_major: HELLO_SERVICE_UE_VERSION_MAJOR as u32,
95-
resource_id: HELLO_SERVICE_RESOURCE_ID as u32,
96-
..Default::default()
97-
};
98-
99-
let service_response_listener: Arc<dyn UListener> = Arc::new(ServiceResponseListener);
100-
client
101-
.register_listener(&sink, Some(&source), service_response_listener)
102-
.await?;
77+
let l2_client = InMemoryRpcClient::new(client.clone(), client.clone())
78+
.await
79+
.unwrap();
80+
81+
let sink = UUri::try_from_parts(
82+
HELLO_SERVICE_AUTHORITY,
83+
HELLO_SERVICE_UE_ID,
84+
HELLO_SERVICE_UE_VERSION_MAJOR,
85+
HELLO_SERVICE_RESOURCE_ID,
86+
)
87+
.unwrap();
10388

10489
let mut i = 0;
10590
loop {
@@ -110,12 +95,36 @@ async fn main() -> Result<(), UStatus> {
11095
..Default::default()
11196
};
11297
i += 1;
98+
println!("Sending Request message with payload:\n{hello_request:?}");
99+
100+
let call_options = CallOptions::for_rpc_request(REQUEST_TTL, None, None, None);
101+
let invoke_res = l2_client
102+
.invoke_method(
103+
sink.clone(),
104+
call_options,
105+
Some(UPayload::try_from_protobuf(hello_request).unwrap()),
106+
)
107+
.await;
108+
109+
let Ok(response) = invoke_res else {
110+
panic!(
111+
"Hit an error attempting to invoke method: {:?}",
112+
invoke_res.err().unwrap()
113+
);
114+
};
113115

114-
let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL)
115-
.build_with_protobuf_payload(&hello_request)
116-
.unwrap();
117-
println!("Sending Request message:\n{request_msg:?}");
116+
let hello_response_vsomeip_unspecified_payload_format = response.unwrap();
117+
let hello_response_protobuf_payload_format = UPayload::new(
118+
hello_response_vsomeip_unspecified_payload_format.payload(),
119+
UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
120+
);
118121

119-
client.send(request_msg).await?;
122+
let Ok(hello_response) =
123+
hello_response_protobuf_payload_format.extract_protobuf::<HelloResponse>()
124+
else {
125+
panic!("Unable to parse into HelloResponse");
126+
};
127+
128+
println!("Here we received response: {hello_response:?}");
120129
}
121130
}

up-transport-vsomeip/examples/hello_service.rs

Lines changed: 58 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,29 @@
1+
/********************************************************************************
2+
* Copyright (c) 2024 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Apache License Version 2.0 which is available at
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* SPDX-License-Identifier: Apache-2.0
12+
********************************************************************************/
13+
114
use async_trait::async_trait;
215
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
316
use log::{error, trace};
417
use std::fs::canonicalize;
518
use std::path::PathBuf;
619
use std::sync::Arc;
720
use std::thread;
21+
use up_rust::communication::{
22+
InMemoryRpcServer, RequestHandler, RpcServer, ServiceInvocationError, UPayload,
23+
};
824
use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY;
9-
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri};
25+
use up_rust::{UCode, UStatus, UUri};
1026
use up_transport_vsomeip::UPTransportVsomeip;
11-
use up_transport_vsomeip::UeId;
1227

1328
const HELLO_SERVICE_ID: u16 = 0x6000;
1429
const HELLO_INSTANCE_ID: u32 = 0x0001;
@@ -31,27 +46,28 @@ const _CLIENT_RESOURCE_ID: u16 = 0;
3146

3247
const _REQUEST_TTL: u32 = 1000;
3348

34-
struct ServiceRequestResponder {
35-
client: Arc<dyn UTransport>,
36-
}
37-
impl ServiceRequestResponder {
38-
pub fn new(client: Arc<dyn UTransport>) -> Self {
39-
Self { client }
49+
struct ServiceRequestHandler;
50+
impl ServiceRequestHandler {
51+
pub fn new() -> Self {
52+
Self
4053
}
4154
}
4255
#[async_trait]
43-
impl UListener for ServiceRequestResponder {
44-
async fn on_receive(&self, msg: UMessage) {
45-
println!("ServiceRequestResponder: Received a message: {msg:?}");
46-
47-
let mut msg = msg.clone();
48-
49-
if let Some(ref mut attributes) = msg.attributes.as_mut() {
50-
attributes.payload_format =
51-
::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
52-
}
53-
54-
let hello_request = msg.extract_protobuf_payload::<HelloRequest>();
56+
impl RequestHandler for ServiceRequestHandler {
57+
async fn handle_request(
58+
&self,
59+
resource_id: u16,
60+
request_payload: Option<UPayload>,
61+
) -> Result<Option<UPayload>, ServiceInvocationError> {
62+
println!("ServiceRequestHandler: Received a resource_id: {resource_id} request_payload: {request_payload:?}");
63+
64+
let hello_request_vsomeip_unspecified_payload_format = request_payload.unwrap();
65+
let hello_request_protobuf_payload_format = UPayload::new(
66+
hello_request_vsomeip_unspecified_payload_format.payload(),
67+
UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
68+
);
69+
let hello_request =
70+
hello_request_protobuf_payload_format.extract_protobuf::<HelloRequest>();
5571

5672
let hello_request = match hello_request {
5773
Ok(hello_request) => {
@@ -60,7 +76,10 @@ impl UListener for ServiceRequestResponder {
6076
}
6177
Err(err) => {
6278
error!("Unable to parse HelloRequest: {err:?}");
63-
return;
79+
return Err(ServiceInvocationError::RpcError(UStatus::fail_with_code(
80+
UCode::INTERNAL,
81+
"Unable to parse hello_request",
82+
)));
6483
}
6584
};
6685

@@ -69,14 +88,9 @@ impl UListener for ServiceRequestResponder {
6988
..Default::default()
7089
};
7190

72-
let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap())
73-
.build_with_wrapped_protobuf_payload(&hello_response)
74-
.unwrap();
75-
self.client.send(response_msg).await.unwrap();
76-
}
91+
println!("Making response to send back: {hello_response:?}");
7792

78-
async fn on_error(&self, err: UStatus) {
79-
println!("ServiceRequestResponder: Encountered an error: {err:?}");
93+
Ok(Some(UPayload::try_from_protobuf(hello_response).unwrap()))
8094
}
8195
}
8296

@@ -94,41 +108,30 @@ async fn main() -> Result<(), UStatus> {
94108

95109
// There will be a single vsomeip_transport, as there is a connection into device and a streamer
96110
// TODO: Add error handling if we fail to create a UPTransportVsomeip
97-
let service: Arc<dyn UTransport> = Arc::new(
111+
let service_uuri = UUri::try_from_parts(
112+
HELLO_SERVICE_AUTHORITY,
113+
HELLO_SERVICE_UE_ID,
114+
HELLO_SERVICE_MAJOR,
115+
// HELLO_SERVICE_RESOURCE_ID,
116+
0,
117+
)
118+
.unwrap();
119+
let service = Arc::new(
98120
UPTransportVsomeip::new_with_config(
99-
&HELLO_SERVICE_AUTHORITY.to_string(),
121+
service_uuri,
100122
&CLIENT_AUTHORITY.to_string(),
101-
HELLO_SERVICE_UE_ID as UeId,
102123
&vsomeip_config.unwrap(),
103124
None,
104125
)
105126
.unwrap(),
106127
);
128+
let l2_service = InMemoryRpcServer::new(service.clone(), service.clone());
107129

108-
let source_filter = UUri {
109-
authority_name: "*".to_string(),
110-
ue_id: 0x0000_FFFF,
111-
ue_version_major: 0xFF,
112-
resource_id: 0xFFFF,
113-
..Default::default()
114-
};
115-
let sink_filter = UUri {
116-
authority_name: HELLO_SERVICE_AUTHORITY.to_string(),
117-
ue_id: HELLO_SERVICE_UE_ID,
118-
ue_version_major: HELLO_SERVICE_MAJOR as u32,
119-
resource_id: HELLO_SERVICE_RESOURCE_ID as u32,
120-
..Default::default()
121-
};
122-
let service_request_responder: Arc<dyn UListener> =
123-
Arc::new(ServiceRequestResponder::new(service.clone()));
124-
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
125-
service
126-
.register_listener(
127-
&source_filter,
128-
Some(&sink_filter),
129-
service_request_responder.clone(),
130-
)
131-
.await?;
130+
let service_request_handler = Arc::new(ServiceRequestHandler::new());
131+
l2_service
132+
.register_endpoint(None, HELLO_SERVICE_RESOURCE_ID, service_request_handler)
133+
.await
134+
.expect("Unable to register endpoint");
132135

133136
thread::park();
134137
Ok(())

up-transport-vsomeip/src/determine_message_type.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ fn determine_type(
110110
}
111111
Some(ue_id) => ue_id,
112112
},
113-
DeterminationType::Send => source_filter.ue_id as ClientId,
113+
DeterminationType::Send => source_filter.ue_id,
114114
};
115-
Ok(RegistrationType::Publish(client_id))
115+
Ok(RegistrationType::Publish(client_id as ClientId))
116116
}
117117
}
118118

0 commit comments

Comments
 (0)