Skip to content

Commit 6bbdba4

Browse files
authored
RSDK-858 - Python assert error hot fix (#24)
1 parent 6e6ac70 commit 6bbdba4

File tree

5 files changed

+64
-23
lines changed

5 files changed

+64
-23
lines changed

examples/src/echo/main.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ use viam::rpc::dial;
99
/// Tests unary, server, and bidi streaming with simple echo requests. To run, simply
1010
/// update the credentials and uri as necessary.
1111
async fn main() -> Result<()> {
12-
let creds = dial::CredentialsExt::new(
12+
let creds = dial::RPCCredentials::new(
13+
None,
1314
"robot-location-secret".to_string(),
14-
"ytexnwei4fu1xv9csoqxfv4ckl3htsb49mzzey5t15xo9swy".to_string(),
15+
"<your secret here>".to_string(),
1516
);
1617

1718
let c = dial::DialOptions::builder()
18-
.uri("webrtc-test-main.jkek76kqnh.viam.cloud")
19+
.uri("<your robot address here>")
1920
.with_credentials(creds)
2021
.allow_downgrade()
2122
.connect()

src/rpc/client_channel.rs

+20-11
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
fmt::Debug,
1212
sync::{
1313
atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering},
14-
Arc,
14+
Arc, RwLock,
1515
},
1616
};
1717
use webrtc::{
@@ -28,6 +28,8 @@ pub struct WebRTCClientChannel {
2828
stream_id_counter: AtomicU64,
2929
pub(crate) streams: CHashMap<u64, WebRTCClientStream>,
3030
pub(crate) receiver_bodies: CHashMap<u64, hyper::Body>,
31+
// String type rather than error type because anyhow::Error does not derive clone
32+
pub(crate) error: RwLock<Option<String>>,
3133
}
3234

3335
impl Debug for WebRTCClientChannel {
@@ -64,7 +66,9 @@ impl WebRTCClientChannel {
6466
data_channel: Arc<RTCDataChannel>,
6567
) -> Arc<Self> {
6668
let base_channel = WebRTCBaseChannel::new(peer_connection, data_channel.clone()).await;
69+
let error = RwLock::new(None);
6770
let channel = Self {
71+
error,
6872
base_channel,
6973
streams: CHashMap::new(),
7074
stream_id_counter: AtomicU64::new(0),
@@ -84,8 +88,14 @@ impl WebRTCClientChannel {
8488
return;
8589
}
8690
};
87-
if let Err(e) = channel.on_channel_message(msg).await {
88-
log::error!("error deserializing message: {e}");
91+
let maybe_err = channel.on_channel_message(msg).await;
92+
let mut err = channel.error.write().unwrap();
93+
match maybe_err {
94+
Err(e) => {
95+
log::error!("error deserializing message: {e}");
96+
*err = Some(e.to_string());
97+
}
98+
Ok(()) => *err = None,
8999
}
90100
})
91101
}));
@@ -119,7 +129,6 @@ impl WebRTCClientChannel {
119129

120130
async fn on_channel_message(&self, msg: DataChannelMessage) -> Result<()> {
121131
let response = Response::decode(&*msg.data.to_vec())?;
122-
let should_drop_stream = matches!(response.r#type, Some(RespType::Trailers(_)));
123132
let (active_stream, stream_id) = match response.stream.as_ref() {
124133
None => {
125134
log::error!(
@@ -141,17 +150,17 @@ impl WebRTCClientChannel {
141150
}
142151
};
143152

144-
match active_stream {
145-
Ok(mut active_stream) => active_stream.on_response(response).await?,
146-
Err(e) => {
147-
log::error!("Error acquiring active stream: {e}");
148-
return Ok(());
149-
}
153+
let should_drop_stream = matches!(response.r#type, Some(RespType::Trailers(_)));
154+
155+
let maybe_err = match active_stream {
156+
Ok(mut active_stream) => active_stream.on_response(response).await,
157+
Err(e) => Err(anyhow::anyhow!("Error acquiring active stream: {e}")),
150158
};
159+
151160
if should_drop_stream {
152161
self.streams.remove(&stream_id);
153162
}
154-
Ok(())
163+
maybe_err
155164
}
156165

157166
pub(crate) fn resp_body_from_stream(&self, stream_id: u64) -> Result<Body> {

src/rpc/client_stream.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl WebRTCClientStream {
4444
Ok(())
4545
}
4646

47-
async fn process_trailers(&mut self, trailers: ResponseTrailers) {
47+
async fn process_trailers(&mut self, trailers: ResponseTrailers) -> Result<()> {
4848
let trailers_to_send = trailers_from_proto(trailers.clone());
4949
if let Err(e) = self
5050
.base_stream
@@ -69,7 +69,14 @@ impl WebRTCClientStream {
6969
}
7070
};
7171

72-
self.base_stream.close_with_recv_error(&mut err.as_ref())
72+
self.base_stream.close_with_recv_error(&mut err.as_ref());
73+
match err {
74+
None => Ok(()),
75+
Some(err) => {
76+
log::error!("Error processing trailers: {err}");
77+
Err(err)
78+
}
79+
}
7380
}
7481

7582
// processes response.
@@ -107,10 +114,7 @@ impl WebRTCClientStream {
107114
self.process_message(message.to_owned()).await
108115
}
109116

110-
Some(Type::Trailers(trailers)) => {
111-
self.process_trailers(trailers.to_owned()).await;
112-
Ok(())
113-
}
117+
Some(Type::Trailers(trailers)) => self.process_trailers(trailers.to_owned()).await,
114118
None => Ok(()),
115119
}
116120
}

src/rpc/dial.rs

+21-3
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,20 @@ impl Service<http::Request<BoxBody>> for ViamChannel {
129129
}
130130
};
131131

132+
// TODO(RSDK-654) if and when we need to provide better support for
133+
// bidi streaming, we'll need to find a better solution than this for
134+
// handling empty messages
135+
let mut body = hyper::body::to_bytes(body).await.unwrap().to_vec();
136+
if body.is_empty() {
137+
// the body is empty if we make a call that returns an error on the RDK
138+
// side. Python's grpc library expects to see messages with, at a minimum,
139+
// a grpc header, so returning an empty body leads to error. In such a
140+
// case, we instead return a body that just consists of empty header bytes
141+
// to indicate an empty message.
142+
body = vec![0, 0, 0, 0, 0];
143+
status_code = STATUS_CODE_UNKNOWN;
144+
}
145+
let grpc_message = channel.error.read().unwrap().clone();
132146
let response = http::response::Response::builder()
133147
// standardized gRPC headers.
134148
.header("content-type", "application/grpc")
@@ -137,9 +151,13 @@ impl Service<http::Request<BoxBody>> for ViamChannel {
137151
.header(TRAILER, "Grpc-Message")
138152
.header(TRAILER, "Grpc-Status-Details-Bin")
139153
.header("grpc-status", &status_code.to_string())
140-
.version(Version::HTTP_2)
141-
.body(body)
142-
.unwrap();
154+
.version(Version::HTTP_2);
155+
156+
let response = match grpc_message {
157+
None => response,
158+
Some(message) => response.header("grpc-message", message),
159+
};
160+
let response = response.body(Body::from(body)).unwrap();
143161
Ok(response)
144162
};
145163
Box::pin(fut)

src/rpc/webrtc.rs

+9
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,15 @@ pub(crate) fn trailers_from_proto(proto: ResponseTrailers) -> HeaderMap {
333333
None => "0".to_string(),
334334
};
335335

336+
match proto.status {
337+
Some(ref status) => {
338+
let key = HeaderName::from_str("Grpc-Message").unwrap();
339+
let val = HeaderValue::from_str(&status.message).unwrap();
340+
trailers.insert(key, val);
341+
}
342+
None => (),
343+
}
344+
336345
let k = match HeaderName::from_str(status_name) {
337346
Ok(k) => k,
338347
Err(e) => {

0 commit comments

Comments
 (0)