Skip to content

Commit aff4c63

Browse files
committed
add error handing
1 parent 4fea4e4 commit aff4c63

File tree

2 files changed

+166
-43
lines changed

2 files changed

+166
-43
lines changed

dubbo/src/extension/invoker_extension.rs

+76-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use async_trait::async_trait;
2828
use bytes::Bytes;
2929
use futures_core::Stream;
3030
use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};
31+
use thiserror::Error;
3132

3233
#[async_trait]
3334
pub trait Invoker {
@@ -78,7 +79,9 @@ pub mod proxy {
7879
use bytes::Bytes;
7980
use futures_core::Stream;
8081
use std::pin::Pin;
82+
use thiserror::Error;
8183
use tokio::sync::{mpsc::Sender, oneshot};
84+
use tracing::error;
8285

8386
pub(super) enum InvokerOpt {
8487
Invoke(
@@ -100,14 +103,36 @@ pub mod proxy {
100103
invocation: GrpcInvocation,
101104
) -> Result<Pin<Box<dyn Stream<Item = Bytes> + Send + 'static>>, StdError> {
102105
let (tx, rx) = oneshot::channel();
103-
let _ = self.tx.send(InvokerOpt::Invoke(invocation, tx));
106+
let ret = self.tx.send(InvokerOpt::Invoke(invocation, tx)).await;
107+
match ret {
108+
Ok(_) => {}
109+
Err(err) => {
110+
error!(
111+
"call invoke method failed by invoker proxy, error: {:?}",
112+
err
113+
);
114+
return Err(InvokerProxyError::new(
115+
"call invoke method failed by invoker proxy",
116+
)
117+
.into());
118+
}
119+
}
104120
let ret = rx.await?;
105121
ret
106122
}
107123

108124
async fn url(&self) -> Result<Url, StdError> {
109125
let (tx, rx) = oneshot::channel();
110-
let _ = self.tx.send(InvokerOpt::Url(tx));
126+
let ret = self.tx.send(InvokerOpt::Url(tx)).await;
127+
match ret {
128+
Ok(_) => {}
129+
Err(err) => {
130+
error!("call url method failed by invoker proxy, error: {:?}", err);
131+
return Err(
132+
InvokerProxyError::new("call url method failed by invoker proxy").into(),
133+
);
134+
}
135+
}
111136
let ret = rx.await?;
112137
ret
113138
}
@@ -121,17 +146,39 @@ pub mod proxy {
121146
match opt {
122147
InvokerOpt::Invoke(invocation, tx) => {
123148
let result = invoker.invoke(invocation).await;
124-
let _ = tx.send(result);
149+
let callback_ret = tx.send(result);
150+
match callback_ret {
151+
Ok(_) => {}
152+
Err(err) => {
153+
error!("invoke method has been called, but callback to caller failed. {:?}", err);
154+
}
155+
}
125156
}
126157
InvokerOpt::Url(tx) => {
127-
let _ = tx.send(invoker.url().await);
158+
let ret = tx.send(invoker.url().await);
159+
match ret {
160+
Ok(_) => {}
161+
Err(err) => {
162+
error!("url method has been called, but callback to caller failed. {:?}", err);
163+
}
164+
}
128165
}
129166
}
130167
}
131168
});
132169
InvokerProxy { tx }
133170
}
134171
}
172+
173+
#[derive(Error, Debug)]
174+
#[error("invoker proxy error: {0}")]
175+
pub struct InvokerProxyError(String);
176+
177+
impl InvokerProxyError {
178+
pub fn new(msg: &str) -> Self {
179+
InvokerProxyError(msg.to_string())
180+
}
181+
}
135182
}
136183

137184
#[derive(Default)]
@@ -149,9 +196,22 @@ impl InvokerExtensionLoader {
149196
}
150197

151198
pub fn load(&mut self, url: Url) -> Result<LoadExtensionPromise<InvokerProxy>, StdError> {
152-
let extension_name = url.query::<ExtensionName>().unwrap();
199+
let extension_name = url.query::<ExtensionName>();
200+
let Some(extension_name) = extension_name else {
201+
return Err(InvokerExtensionLoaderError::new(
202+
"load invoker extension failed, extension mustn't be empty",
203+
)
204+
.into());
205+
};
153206
let extension_name = extension_name.value();
154-
let factory = self.factories.get_mut(&extension_name).unwrap();
207+
let factory = self.factories.get_mut(&extension_name);
208+
let Some(factory) = factory else {
209+
let err_msg = format!(
210+
"load {} invoker extension failed, can not found extension factory",
211+
extension_name
212+
);
213+
return Err(InvokerExtensionLoaderError(err_msg).into());
214+
};
155215
factory.create(url)
156216
}
157217
}
@@ -230,3 +290,13 @@ where
230290
))
231291
}
232292
}
293+
294+
#[derive(Error, Debug)]
295+
#[error("{0}")]
296+
pub struct InvokerExtensionLoaderError(String);
297+
298+
impl InvokerExtensionLoaderError {
299+
pub fn new(msg: &str) -> Self {
300+
InvokerExtensionLoaderError(msg.to_string())
301+
}
302+
}

examples/echo/src/generated/grpc.examples.echo.rs

+90-37
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ pub mod echo_client {
4343
let invocation = RpcInvocation::default()
4444
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
4545
.with_method_name(String::from("UnaryEcho"));
46-
let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
46+
let path = http::uri::PathAndQuery::from_static(
47+
"/grpc.examples.echo.Echo/UnaryEcho",
48+
);
4749
self.inner.unary(request, path, invocation).await
4850
}
4951
/// ServerStreamingEcho is server side streaming.
@@ -100,7 +102,9 @@ pub mod echo_server {
100102
request: Request<super::EchoRequest>,
101103
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
102104
///Server streaming response type for the ServerStreamingEcho method.
103-
type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
105+
type ServerStreamingEchoStream: futures_util::Stream<
106+
Item = Result<super::EchoResponse, dubbo::status::Status>,
107+
>
104108
+ Send
105109
+ 'static;
106110
/// ServerStreamingEcho is server side streaming.
@@ -114,14 +118,19 @@ pub mod echo_server {
114118
request: Request<Decoding<super::EchoRequest>>,
115119
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
116120
///Server streaming response type for the BidirectionalStreamingEcho method.
117-
type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
121+
type BidirectionalStreamingEchoStream: futures_util::Stream<
122+
Item = Result<super::EchoResponse, dubbo::status::Status>,
123+
>
118124
+ Send
119125
+ 'static;
120126
/// BidirectionalStreamingEcho is bidi streaming.
121127
async fn bidirectional_streaming_echo(
122128
&self,
123129
request: Request<Decoding<super::EchoRequest>>,
124-
) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
130+
) -> Result<
131+
Response<Self::BidirectionalStreamingEchoStream>,
132+
dubbo::status::Status,
133+
>;
125134
}
126135
/// Echo is the echo service.
127136
#[derive(Debug)]
@@ -151,7 +160,10 @@ pub mod echo_server {
151160
type Response = http::Response<BoxBody>;
152161
type Error = std::convert::Infallible;
153162
type Future = BoxFuture<Self::Response, Self::Error>;
154-
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163+
fn poll_ready(
164+
&mut self,
165+
_cx: &mut Context<'_>,
166+
) -> Poll<Result<(), Self::Error>> {
155167
Poll::Ready(Ok(()))
156168
}
157169
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -164,16 +176,24 @@ pub mod echo_server {
164176
}
165177
impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
166178
type Response = super::EchoResponse;
167-
type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
168-
fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
179+
type Future = BoxFuture<
180+
Response<Self::Response>,
181+
dubbo::status::Status,
182+
>;
183+
fn call(
184+
&mut self,
185+
request: Request<super::EchoRequest>,
186+
) -> Self::Future {
169187
let inner = self.inner.0.clone();
170188
let fut = async move { inner.unary_echo(request).await };
171189
Box::pin(fut)
172190
}
173191
}
174192
let fut = async move {
175-
let mut server =
176-
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
193+
let mut server = TripleServer::<
194+
super::EchoRequest,
195+
super::EchoResponse,
196+
>::new();
177197
let res = server.unary(UnaryEchoServer { inner }, req).await;
178198
Ok(res)
179199
};
@@ -184,20 +204,30 @@ pub mod echo_server {
184204
struct ServerStreamingEchoServer<T: Echo> {
185205
inner: _Inner<T>,
186206
}
187-
impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
207+
impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
208+
for ServerStreamingEchoServer<T> {
188209
type Response = super::EchoResponse;
189210
type ResponseStream = T::ServerStreamingEchoStream;
190-
type Future =
191-
BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
192-
fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
211+
type Future = BoxFuture<
212+
Response<Self::ResponseStream>,
213+
dubbo::status::Status,
214+
>;
215+
fn call(
216+
&mut self,
217+
request: Request<super::EchoRequest>,
218+
) -> Self::Future {
193219
let inner = self.inner.0.clone();
194-
let fut = async move { inner.server_streaming_echo(request).await };
220+
let fut = async move {
221+
inner.server_streaming_echo(request).await
222+
};
195223
Box::pin(fut)
196224
}
197225
}
198226
let fut = async move {
199-
let mut server =
200-
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
227+
let mut server = TripleServer::<
228+
super::EchoRequest,
229+
super::EchoResponse,
230+
>::new();
201231
let res = server
202232
.server_streaming(ServerStreamingEchoServer { inner }, req)
203233
.await;
@@ -210,21 +240,29 @@ pub mod echo_server {
210240
struct ClientStreamingEchoServer<T: Echo> {
211241
inner: _Inner<T>,
212242
}
213-
impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
243+
impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
244+
for ClientStreamingEchoServer<T> {
214245
type Response = super::EchoResponse;
215-
type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
246+
type Future = BoxFuture<
247+
Response<Self::Response>,
248+
dubbo::status::Status,
249+
>;
216250
fn call(
217251
&mut self,
218252
request: Request<Decoding<super::EchoRequest>>,
219253
) -> Self::Future {
220254
let inner = self.inner.0.clone();
221-
let fut = async move { inner.client_streaming_echo(request).await };
255+
let fut = async move {
256+
inner.client_streaming_echo(request).await
257+
};
222258
Box::pin(fut)
223259
}
224260
}
225261
let fut = async move {
226-
let mut server =
227-
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
262+
let mut server = TripleServer::<
263+
super::EchoRequest,
264+
super::EchoResponse,
265+
>::new();
228266
let res = server
229267
.client_streaming(ClientStreamingEchoServer { inner }, req)
230268
.await;
@@ -237,39 +275,54 @@ pub mod echo_server {
237275
struct BidirectionalStreamingEchoServer<T: Echo> {
238276
inner: _Inner<T>,
239277
}
240-
impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
278+
impl<T: Echo> StreamingSvc<super::EchoRequest>
279+
for BidirectionalStreamingEchoServer<T> {
241280
type Response = super::EchoResponse;
242281
type ResponseStream = T::BidirectionalStreamingEchoStream;
243-
type Future =
244-
BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
282+
type Future = BoxFuture<
283+
Response<Self::ResponseStream>,
284+
dubbo::status::Status,
285+
>;
245286
fn call(
246287
&mut self,
247288
request: Request<Decoding<super::EchoRequest>>,
248289
) -> Self::Future {
249290
let inner = self.inner.0.clone();
250-
let fut =
251-
async move { inner.bidirectional_streaming_echo(request).await };
291+
let fut = async move {
292+
inner.bidirectional_streaming_echo(request).await
293+
};
252294
Box::pin(fut)
253295
}
254296
}
255297
let fut = async move {
256-
let mut server =
257-
TripleServer::<super::EchoRequest, super::EchoResponse>::new();
298+
let mut server = TripleServer::<
299+
super::EchoRequest,
300+
super::EchoResponse,
301+
>::new();
258302
let res = server
259-
.bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
303+
.bidi_streaming(
304+
BidirectionalStreamingEchoServer {
305+
inner,
306+
},
307+
req,
308+
)
260309
.await;
261310
Ok(res)
262311
};
263312
Box::pin(fut)
264313
}
265-
_ => Box::pin(async move {
266-
Ok(http::Response::builder()
267-
.status(200)
268-
.header("grpc-status", "12")
269-
.header("content-type", "application/grpc")
270-
.body(empty_body())
271-
.unwrap())
272-
}),
314+
_ => {
315+
Box::pin(async move {
316+
Ok(
317+
http::Response::builder()
318+
.status(200)
319+
.header("grpc-status", "12")
320+
.header("content-type", "application/grpc")
321+
.body(empty_body())
322+
.unwrap(),
323+
)
324+
})
325+
}
273326
}
274327
}
275328
}

0 commit comments

Comments
 (0)