Skip to content

Commit ab7527e

Browse files
committed
feat(query): add async retry transform
1 parent ce031db commit ab7527e

File tree

6 files changed

+100
-4
lines changed

6 files changed

+100
-4
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/expression/src/utils/udf_client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use crate::DataSchema;
3535
const UDF_TCP_KEEP_ALIVE_SEC: u64 = 30;
3636
const UDF_HTTP2_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
3737
const UDF_KEEP_ALIVE_TIMEOUT_SEC: u64 = 20;
38+
// 4MB by default, we use 16G
39+
// max_encoding_message_size is usize::max by default
3840
const MAX_DECODING_MESSAGE_SIZE: usize = 16 * 1024 * 1024 * 1024;
3941

4042
#[derive(Debug, Clone)]

src/query/pipeline/transforms/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ jsonb = { workspace = true }
2323
match-template = { workspace = true }
2424
serde = { workspace = true }
2525
typetag = { workspace = true }
26+
tokio = { workspace = true }
27+
2628

2729
[dev-dependencies]
2830
itertools = { workspace = true }

src/query/pipeline/transforms/src/processors/transforms/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod transform_dummy;
2525
mod transform_multi_sort_merge;
2626
mod transform_sort_merge_base;
2727

28+
mod transform_retry_async;
2829
mod transform_sort_merge;
2930
mod transform_sort_merge_limit;
3031
pub mod transform_sort_partial;
@@ -38,6 +39,7 @@ pub use transform_blocking::*;
3839
pub use transform_compact::*;
3940
pub use transform_dummy::*;
4041
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
42+
pub use transform_retry_async::*;
4143
pub use transform_sort_merge::sort_merge;
4244
pub use transform_sort_merge::*;
4345
pub use transform_sort_merge_base::*;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_expression::DataBlock;
17+
18+
use super::AsyncTransform;
19+
20+
pub trait AsyncRetry: AsyncTransform {
21+
fn retry_on(&self, err: &databend_common_exception::ErrorCode) -> bool;
22+
fn retry_strategy(&self) -> RetryStrategy;
23+
}
24+
25+
#[derive(Clone)]
26+
pub struct RetryStrategy {
27+
pub retry_times: usize,
28+
pub retry_sleep_duration: Option<tokio::time::Duration>,
29+
}
30+
31+
pub struct AsyncRetryWrapper<T: AsyncRetry + 'static> {
32+
t: T,
33+
}
34+
35+
impl<T: AsyncRetry + 'static> AsyncRetryWrapper<T> {
36+
pub fn create(inner: T) -> Self {
37+
Self { t: inner }
38+
}
39+
}
40+
41+
#[async_trait::async_trait]
42+
impl<T: AsyncRetry + 'static> AsyncTransform for AsyncRetryWrapper<T> {
43+
const NAME: &'static str = T::NAME;
44+
45+
async fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
46+
let strategy = self.t.retry_strategy();
47+
for _ in 0..strategy.retry_times {
48+
match self.t.transform(data.clone()).await {
49+
Ok(v) => return Ok(v),
50+
Err(e) => {
51+
if !self.t.retry_on(&e) {
52+
return Err(e);
53+
}
54+
if let Some(duration) = strategy.retry_sleep_duration {
55+
tokio::time::sleep(duration).await;
56+
}
57+
}
58+
}
59+
}
60+
self.t.transform(data.clone()).await
61+
}
62+
63+
fn name(&self) -> String {
64+
Self::NAME.to_string()
65+
}
66+
67+
async fn on_start(&mut self) -> Result<()> {
68+
self.t.on_start().await
69+
}
70+
71+
async fn on_finish(&mut self) -> Result<()> {
72+
self.t.on_finish().await
73+
}
74+
}

src/query/service/src/pipelines/processors/transforms/transform_udf_server.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ use databend_common_expression::DataBlock;
2424
use databend_common_expression::DataField;
2525
use databend_common_expression::DataSchema;
2626
use databend_common_expression::FunctionContext;
27+
use databend_common_pipeline_transforms::processors::AsyncRetry;
28+
use databend_common_pipeline_transforms::processors::AsyncRetryWrapper;
2729
use databend_common_pipeline_transforms::processors::AsyncTransform;
2830
use databend_common_pipeline_transforms::processors::AsyncTransformer;
31+
use databend_common_pipeline_transforms::processors::RetryStrategy;
2932
use databend_common_sql::executor::physical_plans::UdfFunctionDesc;
3033

3134
use crate::pipelines::processors::InputPort;
@@ -44,10 +47,22 @@ impl TransformUdfServer {
4447
input: Arc<InputPort>,
4548
output: Arc<OutputPort>,
4649
) -> Result<Box<dyn Processor>> {
47-
Ok(AsyncTransformer::create(input, output, Self {
48-
func_ctx,
49-
funcs,
50-
}))
50+
let s = Self { func_ctx, funcs };
51+
let retry_wrapper = AsyncRetryWrapper::create(s);
52+
Ok(AsyncTransformer::create(input, output, retry_wrapper))
53+
}
54+
}
55+
56+
impl AsyncRetry for TransformUdfServer {
57+
fn retry_on(&self, err: &databend_common_exception::ErrorCode) -> bool {
58+
true
59+
}
60+
61+
fn retry_strategy(&self) -> RetryStrategy {
62+
RetryStrategy {
63+
retry_times: 64,
64+
retry_sleep_duration: Some(tokio::time::Duration::from_millis(500)),
65+
}
5166
}
5267
}
5368

0 commit comments

Comments
 (0)