Skip to content

Commit 6f4741d

Browse files
phillipleblanclukekim
authored andcommitted
Add exponential backoff and log suppression for scheduler disconnection
1 parent f354740 commit 6f4741d

7 files changed

Lines changed: 150 additions & 64 deletions

File tree

Cargo.lock

Lines changed: 67 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ dashmap = { version = "6.1" }
6868
async-trait = { version = "0.1" }
6969
serde = { version = "1.0" }
7070
tokio-stream = { version = "0.1" }
71+
backoff = { version = "0.4" }
7172
url = { version = "2.5" }
7273

7374
# cargo build --profile release-lto

ballista/core/src/serde/generated/ballista.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -830,15 +830,8 @@ pub mod execute_query_params {
830830
pub enum Query {
831831
#[prost(bytes, tag = "1")]
832832
LogicalPlan(::prost::alloc::vec::Vec<u8>),
833-
834833
#[prost(bytes, tag = "6")]
835834
SubstraitPlan(::prost::alloc::vec::Vec<u8>),
836-
837-
/// I'd suggest to remove this, if SQL needed use `flight-sql`
838-
#[deprecated]
839-
#[prost(string, tag = "2")]
840-
Sql(::prost::alloc::string::String),
841-
842835
}
843836
}
844837
#[derive(Clone, PartialEq, ::prost::Message)]

ballista/core/src/utils.rs

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::sync::Arc;
3535
use std::time::{Duration, SystemTime, UNIX_EPOCH};
3636
use std::{fs::File, pin::Pin};
3737
use tonic::codegen::StdError;
38-
use tonic::transport::{Channel, Endpoint, Error, Server};
38+
use tonic::transport::{Channel, Error, Server};
3939

4040
/// Configuration for gRPC client connections.
4141
///
@@ -196,21 +196,11 @@ pub async fn collect_stream(
196196
Ok(batches)
197197
}
198198

199-
200-
201199
/// Creates a gRPC client connection with the specified configuration.
202-
203-
204200
pub async fn create_grpc_client_connection<D>(
205201
dst: D,
206202
config: &GrpcClientConfig,
207203
) -> std::result::Result<Channel, Error>
208-
209-
210-
pub fn create_grpc_client_endpoint<D>(dst: D) -> std::result::Result<Endpoint, Error>
211-
212-
213-
214204
where
215205
D: std::convert::TryInto<tonic::transport::Endpoint>,
216206
D::Error: Into<StdError>,
@@ -231,6 +221,26 @@ where
231221
endpoint.connect().await
232222
}
233223

224+
/// Creates a gRPC client endpoint (returns Endpoint without connecting).
225+
/// Used for TLS/API key customization before establishing connection.
226+
pub fn create_grpc_client_endpoint<D>(dst: D) -> std::result::Result<tonic::transport::Endpoint, Error>
227+
where
228+
D: std::convert::TryInto<tonic::transport::Endpoint>,
229+
D::Error: Into<StdError>,
230+
{
231+
let endpoint = tonic::transport::Endpoint::new(dst)?
232+
.connect_timeout(Duration::from_secs(20))
233+
.timeout(Duration::from_secs(20))
234+
// Disable Nagle's Algorithm since we don't want packets to wait
235+
.tcp_nodelay(true)
236+
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
237+
.http2_keep_alive_interval(Duration::from_secs(300))
238+
.keep_alive_timeout(Duration::from_secs(20))
239+
.keep_alive_while_idle(true);
240+
241+
Ok(endpoint)
242+
}
243+
234244
/// Creates a gRPC server builder with the specified configuration.
235245
pub fn create_grpc_server(config: &GrpcServerConfig) -> Server {
236246
Server::builder()
@@ -270,24 +280,3 @@ pub fn get_time_before(interval_seconds: u64) -> u64 {
270280
.unwrap_or_else(|| Duration::from_secs(0))
271281
.as_secs()
272282
}
273-
274-
/// Create a gRPC client endpoint with configurable settings
275-
///
276-
/// This is used by Spice extensions for customizing gRPC endpoint creation
277-
pub fn create_grpc_client_endpoint<D>(dst: D) -> std::result::Result<Endpoint, Error>
278-
where
279-
D: std::convert::TryInto<tonic::transport::Endpoint>,
280-
D::Error: Into<StdError>,
281-
{
282-
let endpoint = tonic::transport::Endpoint::new(dst)?
283-
.connect_timeout(Duration::from_secs(20))
284-
.timeout(Duration::from_secs(20))
285-
// Disable Nagle's Algorithm since we don't want packets to wait
286-
.tcp_nodelay(true)
287-
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
288-
.http2_keep_alive_interval(Duration::from_secs(300))
289-
.keep_alive_timeout(Duration::from_secs(20))
290-
.keep_alive_while_idle(true);
291-
292-
Ok(endpoint)
293-
}

ballista/executor/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ default = ["build-binary", "mimalloc"]
4040
arrow = { workspace = true }
4141
arrow-flight = { workspace = true }
4242
async-trait = { workspace = true }
43+
4344
ballista-core = { path = "../core", version = "51.0.0" }
45+
46+
backoff = { workspace = true }
47+
4448
clap = { workspace = true, optional = true }
4549
dashmap = { workspace = true }
4650
datafusion = { workspace = true }

0 commit comments

Comments
 (0)