Skip to content

Commit 75069d6

Browse files
committed
ClientTaskDone:set_custom_error() add last_index & config_ver as extra param
remove unused code
1 parent de1766a commit 75069d6

9 files changed

Lines changed: 226 additions & 288 deletions

File tree

src/client/mod.rs

Lines changed: 54 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
pub mod task;
22
use captains_log::filter::LogFilter;
33
use crossfire::oneshot::oneshot;
4-
use crossfire::*;
54
use orb::AsyncRuntime;
65
pub use razor_rpc_macros::{endpoint_async, endpoint_client};
76
pub use razor_stream::client::ClientCaller;
87
pub use task::*;
98

109
use crate::Codec;
11-
use crate::error::{EncodedErr, RpcErrCodec, RpcError, RpcIntErr};
10+
use crate::error::{RpcErrCodec, RpcError, RpcIntErr};
1211
pub use razor_stream::client::{
1312
ClientCallerBlocking, ClientConfig, ClientFacts, ClientTransport, ConnPool, FailoverPool,
1413
};
@@ -88,8 +87,14 @@ where
8887
async move {
8988
let (tx, rx) = oneshot::<APIClientReq>();
9089
let codec = <Self as ClientCaller>::get_codec(self);
91-
self.send_req(make_req(&codec, service_method, req, tx)).await;
92-
process_res(&codec, rx.recv_async().await)
90+
let mut task = APIClientReq::new(&codec, service_method, req);
91+
task.set_noti(tx);
92+
self.send_req(task).await;
93+
if let Ok(mut task) = rx.recv_async().await {
94+
return task.process_res(&codec);
95+
} else {
96+
return Err(RpcIntErr::Internal.into());
97+
}
9398
}
9499
}
95100
}
@@ -111,141 +116,59 @@ where
111116
let codec = <Self as ClientCaller>::get_codec(self);
112117
let mut retry_count = 0;
113118
let max_retries = self.get_retry_limit();
114-
let mut last_index: usize = 0;
115-
119+
let mut task = APIClientReq::new(&codec, service_method, req);
120+
let (tx, mut rx) = oneshot::<APIClientReq>();
121+
task.set_noti(tx);
122+
self.send_req(task).await;
116123
loop {
117-
let (tx, rx) = oneshot::<APIClientReq>();
118-
let mut api_req = make_req(&codec, service_method, req, tx);
119-
// Set routing info for retry
120-
if retry_count > 0 {
121-
api_req.routing_info = Some(task::RoutingInfo { cluster_ver: 0, last_index });
122-
}
123-
self.send_req(api_req).await;
124-
let result = process_res::<_, Resp, E>(&codec, rx.recv_async().await);
125-
126-
match result {
127-
Ok(resp) => return Ok(resp),
128-
Err(RpcError::User(e)) => {
129-
match e.should_failover() {
130-
Ok(Some(redirect_addr)) => {
131-
if retry_count < max_retries {
132-
retry_count += 1;
133-
// Retry to specific address
134-
let (tx, rx) = oneshot::<APIClientReq>();
135-
let api_req = make_req(&codec, service_method, req, tx);
136-
self.resubmit(
137-
api_req,
138-
Some(redirect_addr.to_string()),
139-
last_index,
140-
)
141-
.await;
142-
// Update last_index for next iteration
143-
last_index = 0; // Will be updated by resubmit
144-
continue;
124+
if let Ok(mut task) = rx.recv_async().await {
125+
let result = task.process_res::<_, Resp, E>(&codec);
126+
match result {
127+
Ok(resp) => return Ok(resp),
128+
Err(RpcError::Rpc(e)) => {
129+
// RpcIntErr less than Method is retriable by FailoverPool internally
130+
return Err(RpcError::Rpc(e));
131+
}
132+
Err(RpcError::User(e)) => {
133+
retry_count += 1;
134+
match e.should_failover() {
135+
Ok(Some(redirect_addr)) => {
136+
if retry_count < max_retries {
137+
// Retry to specific address
138+
let (tx, _rx) = oneshot::<APIClientReq>();
139+
task.set_noti(tx);
140+
rx = _rx;
141+
self.resubmit(
142+
task,
143+
Ok(redirect_addr.to_string()),
144+
retry_count,
145+
None,
146+
)
147+
.await;
148+
continue;
149+
}
145150
}
146-
}
147-
Ok(None) => {
148-
if retry_count < max_retries {
149-
retry_count += 1;
150-
// Retry to next node
151-
let (tx, rx) = oneshot::<APIClientReq>();
152-
let api_req = make_req(&codec, service_method, req, tx);
153-
self.resubmit(api_req, None, last_index).await;
154-
last_index += 1;
155-
continue;
151+
Ok(None) => {
152+
if retry_count < max_retries {
153+
// Retry to next node
154+
let (tx, _rx) = oneshot::<APIClientReq>();
155+
task.set_noti(tx);
156+
rx = _rx;
157+
let last_index = task.last_index;
158+
self.resubmit(task, Err(last_index), retry_count, None)
159+
.await;
160+
continue;
161+
}
156162
}
163+
Err(()) => return Err(RpcError::User(e)),
157164
}
158-
Err(()) => return Err(RpcError::User(e)),
159-
}
160-
return Err(RpcError::User(e));
161-
}
162-
Err(RpcError::Rpc(e)) => {
163-
// RpcIntErr less than Method is retriable by FailoverPool internally
164-
return Err(RpcError::Rpc(e));
165-
}
166-
}
167-
}
168-
}
169-
}
170-
}
171-
172-
/*
173-
*
174-
BlockingEndpoint trait is provided for user-defined blocking clients
175-
Users implement this trait on their client structs to get the call() method
176-
pub trait BlockingEndpoint: ClientCallerBlocking<Facts: ClientFacts<Task = APIClientReq>>,
177-
{
178-
fn call<Req, Resp, E>(
179-
&self, service_method: &'static str, req: &Req,
180-
) -> Result<Resp, RpcError<E>>
181-
where
182-
Req: serde::Serialize + fmt::Debug,
183-
Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
184-
E: RpcErrCodec,
185-
{
186-
let (tx, rx) = oneshot::<APIClientReq>();
187-
let codec = self.codec();
188-
self.caller().send_req_blocking(make_req(codec, service_method, req, tx));
189-
process_res(codec, rx.recv())
190-
}
191-
}
192-
*/
193-
194-
#[inline]
195-
fn make_req<C, Req>(
196-
codec: &C, service_method: &'static str, req: &Req, done_tx: oneshot::TxOneshot<APIClientReq>,
197-
) -> APIClientReq
198-
where
199-
C: Codec,
200-
Req: serde::Serialize + fmt::Debug,
201-
{
202-
let req_buf = codec.encode(req).expect("encode");
203-
APIClientReq {
204-
common: Default::default(),
205-
req_msg: Some(req_buf),
206-
action: service_method.to_string(),
207-
resp: None,
208-
res: None,
209-
noti: Some(done_tx),
210-
routing_info: None,
211-
}
212-
}
213-
214-
#[inline]
215-
fn process_res<C, Resp, E>(
216-
codec: &C, task_res: Result<APIClientReq, crossfire::RecvError>,
217-
) -> Result<Resp, RpcError<E>>
218-
where
219-
C: Codec,
220-
Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
221-
E: RpcErrCodec,
222-
{
223-
match task_res {
224-
Ok(mut task) => {
225-
let res = task.res.take().unwrap();
226-
match res {
227-
Ok(()) => {
228-
if let Some(resp) = task.resp {
229-
match codec.decode(&resp) {
230-
Ok(resp_msg) => Ok(resp_msg),
231-
Err(()) => Err(RpcIntErr::Decode.into()),
165+
return Err(RpcError::User(e));
232166
}
233-
} else {
234-
Ok(Resp::default())
235167
}
168+
} else {
169+
return Err(RpcIntErr::Internal.into());
236170
}
237-
Err(EncodedErr::Rpc(e)) => Err(RpcError::Rpc(e)),
238-
Err(EncodedErr::Num(n)) => match E::decode(codec, Ok(n)) {
239-
Ok(e) => Err(RpcError::User(e)),
240-
Err(()) => Err(RpcIntErr::Decode.into()),
241-
},
242-
Err(EncodedErr::Buf(buf)) => match E::decode(codec, Err(&buf)) {
243-
Ok(e) => Err(RpcError::User(e)),
244-
Err(()) => Err(RpcIntErr::Decode.into()),
245-
},
246-
_ => unreachable!(),
247171
}
248172
}
249-
Err(_) => Err(RpcIntErr::Internal.into()),
250173
}
251174
}

src/client/task.rs

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
Codec,
3-
error::{EncodedErr, RpcIntErr},
3+
error::{EncodedErr, RpcErrCodec, RpcError, RpcIntErr},
44
};
55
use crossfire::oneshot::TxOneshot;
66
use razor_stream::client::task::{
@@ -12,11 +12,7 @@ use std::fmt;
1212
use std::io::Write;
1313

1414
/// Routing info for failover/retry operations
15-
#[derive(Debug, Clone, Copy)]
16-
pub struct RoutingInfo {
17-
pub cluster_ver: u64,
18-
pub last_index: usize,
19-
}
15+
pub struct RoutingInfo {}
2016

2117
pub struct APIClientReq {
2218
pub common: ClientTaskCommon,
@@ -26,8 +22,68 @@ pub struct APIClientReq {
2622
pub resp: Option<Vec<u8>>,
2723
pub res: Option<Result<(), EncodedErr>>,
2824
pub noti: Option<TxOneshot<Self>>,
29-
/// Routing info for failover/retry
30-
pub routing_info: Option<RoutingInfo>,
25+
/// route info for FailoverPool
26+
pub config_ver: u64,
27+
/// Routing info for FailoverPool
28+
pub last_index: usize,
29+
}
30+
31+
impl APIClientReq {
32+
#[inline]
33+
pub fn new<C, Req>(codec: &C, service_method: &'static str, req: &Req) -> Self
34+
where
35+
C: Codec,
36+
Req: serde::Serialize + fmt::Debug,
37+
{
38+
let req_buf = codec.encode(req).expect("encode");
39+
Self {
40+
common: Default::default(),
41+
req_msg: Some(req_buf), // TODO why some?
42+
action: service_method.to_string(),
43+
resp: None,
44+
res: None,
45+
noti: None,
46+
config_ver: 0,
47+
last_index: 0,
48+
}
49+
}
50+
51+
#[inline]
52+
pub fn set_noti(&mut self, done_tx: TxOneshot<Self>) {
53+
self.noti.replace(done_tx);
54+
}
55+
56+
#[inline]
57+
pub fn process_res<C, Resp, E>(&mut self, codec: &C) -> Result<Resp, RpcError<E>>
58+
where
59+
C: Codec,
60+
Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
61+
E: RpcErrCodec,
62+
{
63+
let res = self.res.take().unwrap();
64+
match res {
65+
Ok(()) => {
66+
if let Some(resp) = &self.resp {
67+
match codec.decode(resp) {
68+
Ok(resp_msg) => Ok(resp_msg),
69+
Err(()) => Err(RpcIntErr::Decode.into()),
70+
}
71+
} else {
72+
Ok(Resp::default())
73+
}
74+
}
75+
Err(EncodedErr::Rpc(e)) => Err(RpcError::Rpc(e)),
76+
Err(EncodedErr::Num(n)) => match E::decode(codec, Ok(n)) {
77+
Ok(e) => Err(RpcError::User(e)),
78+
Err(()) => Err(RpcIntErr::Decode.into()),
79+
},
80+
Err(EncodedErr::Buf(buf)) => match E::decode(codec, Err(&buf)) {
81+
Ok(e) => Err(RpcError::User(e)),
82+
Err(()) => Err(RpcIntErr::Decode.into()),
83+
},
84+
_ => unreachable!(),
85+
}
86+
}
3187
}
3288

3389
impl ClientTaskEncode for APIClientReq {
@@ -56,9 +112,13 @@ impl ClientTaskDecode for APIClientReq {
56112

57113
impl ClientTaskDone for APIClientReq {
58114
#[inline]
59-
fn set_custom_error<C: Codec>(&mut self, _codec: &C, e: EncodedErr) {
115+
fn set_custom_error<C: Codec>(
116+
&mut self, _codec: &C, e: EncodedErr, last_idx: usize, config_ver: u64,
117+
) {
60118
// Ignore the Codec, as we don't known the error type yet
61119
self.res.replace(Err(e));
120+
self.config_ver = config_ver;
121+
self.last_index = last_idx;
62122
}
63123

64124
#[inline]

stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ io-buffer = "1"
3333
serde = "1"
3434
crossfire = ">=3.1"
3535
futures-util = "0.3"
36+
ahash = "0.8"
3637

3738
[dev-dependencies]
3839
num_enum = "0"

stream/macros/src/client_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ pub fn client_task_impl(attr: TokenStream, input: TokenStream) -> TokenStream {
340340

341341
impl #impl_generics razor_stream::client::task::ClientTaskDone for #struct_name #ty_generics #where_clause_with_into {
342342
#[inline]
343-
fn set_custom_error<C: razor_stream::Codec>(&mut self, codec: &C, res: razor_stream::error::EncodedErr) {
343+
fn set_custom_error<C: razor_stream::Codec>(&mut self, codec: &C, res: razor_stream::error::EncodedErr, _last_index: usize, _conf_ver: u64) {
344344
let rpc_error = match res {
345345
razor_stream::error::EncodedErr::Rpc(e) => e.into(),
346346
razor_stream::error::EncodedErr::Num(n) => {

stream/macros/src/client_task_enum.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub fn client_task_enum_impl(attr: TokenStream, input: TokenStream) -> TokenStre
148148
#enum_name::#variant_name(inner) => razor_stream::client::task::ClientTaskGetResult::get_result(inner),
149149
});
150150
set_custom_error_arms.push(quote! {
151-
#enum_name::#variant_name(inner) => razor_stream::client::task::ClientTaskDone::set_custom_error(inner, codec, res),
151+
#enum_name::#variant_name(inner) => razor_stream::client::task::ClientTaskDone::set_custom_error(inner, codec, res, last_index, conf_ver),
152152
});
153153
set_rpc_error_arms.push(quote! {
154154
#enum_name::#variant_name(inner) => razor_stream::client::task::ClientTaskDone::set_rpc_error(inner, e),
@@ -245,7 +245,7 @@ pub fn client_task_enum_impl(attr: TokenStream, input: TokenStream) -> TokenStre
245245

246246
impl #impl_generics razor_stream::client::task::ClientTaskDone for #enum_name #ty_generics #where_clause {
247247
#[inline]
248-
fn set_custom_error<C: razor_stream::Codec>(&mut self, codec: &C, res: razor_stream::error::EncodedErr) {
248+
fn set_custom_error<C: razor_stream::Codec>(&mut self, codec: &C, res: razor_stream::error::EncodedErr, last_index: usize, conf_ver: u64) {
249249
match self {
250250
#(#set_custom_error_arms)*
251251
}

0 commit comments

Comments
 (0)