Skip to content

Commit bb56b86

Browse files
committed
Fix formatting and address comments
1 parent 0963432 commit bb56b86

3 files changed

Lines changed: 22 additions & 19 deletions

File tree

core/src/flight/exec.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,12 @@ async fn flight_stream(
193193
for loc in partition.locations.iter() {
194194
let get_client = || async { flight_client(loc, grpc_headers.as_ref(), &size_limits).await };
195195
let client = run_async_with_tokio(get_client).await?;
196-
// let client = flight_client(loc, grpc_headers.as_ref(), &size_limits).await?;
197196
match try_fetch_stream(client, &partition.ticket, schema.clone()).await {
198197
Ok(stream) => return Ok(stream),
199198
Err(e) => errors.push(Box::new(e)),
200199
}
201200
}
202-
let err = errors.into_iter().last().unwrap_or_else(|| {
201+
let err = errors.into_iter().next_back().unwrap_or_else(|| {
203202
Box::new(FlightError::ProtocolError(format!(
204203
"No available location for endpoint {:?}",
205204
partition.locations

core/src/sql/db_connection_pool/runtime.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,22 @@
33
// tokio runtime, so we need to start a new one.
44
use std::{future::Future, sync::OnceLock};
55

6-
use tokio::runtime::{Handle, Runtime};
6+
use tokio::runtime::Handle;
77

8-
pub fn get_tokio_runtime() -> &'static Runtime {
9-
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
10-
RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime"))
8+
pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
9+
10+
#[inline]
11+
pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
12+
static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
13+
RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap()))
14+
}
15+
16+
pub fn execute_in_tokio<F, Fut, T>(f: F) -> T
17+
where
18+
F: FnOnce() -> Fut,
19+
Fut: Future<Output = T>,
20+
{
21+
get_tokio_runtime().0.block_on(f())
1122
}
1223

1324
pub async fn run_async_with_tokio<F, Fut, T, E>(f: F) -> Result<T, E>
@@ -17,13 +28,14 @@ where
1728
{
1829
match Handle::try_current() {
1930
Ok(_) => f().await,
20-
Err(_) => get_tokio_runtime().block_on(f()),
31+
// Err(_) => get_tokio_runtime().0.block_on(f()),
32+
Err(_) => execute_in_tokio(f),
2133
}
2234
}
2335

2436
pub fn run_sync_with_tokio<T, E>(f: impl FnOnce() -> Result<T, E>) -> Result<T, E> {
2537
match Handle::try_current() {
2638
Ok(_) => f(),
27-
Err(_) => get_tokio_runtime().block_on(async { f() }),
39+
Err(_) => execute_in_tokio(|| async { f() }),
2840
}
2941
}

python/src/utils.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
11
use pyo3::{exceptions::PyException, prelude::*};
2-
use std::{future::Future, sync::OnceLock};
2+
use std::future::Future;
33

4+
use datafusion_table_providers::sql::db_connection_pool::runtime::execute_in_tokio;
45
use pyo3::types::PyDict;
56
use std::collections::HashMap;
67

7-
pub(crate) struct TokioRuntime(tokio::runtime::Runtime);
8-
9-
#[inline]
10-
pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
11-
static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
12-
RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap()))
13-
}
14-
158
/// Utility to collect rust futures with GIL released
169
pub fn wait_for_future<F>(py: Python, f: F) -> F::Output
1710
where
1811
F: Future + Send,
1912
F::Output: Send,
2013
{
21-
let runtime: &tokio::runtime::Runtime = &get_tokio_runtime().0;
22-
py.allow_threads(|| runtime.block_on(f))
14+
py.allow_threads(|| execute_in_tokio(|| f))
2315
}
2416

2517
pub fn to_pyerr<T: ToString>(err: T) -> PyErr {

0 commit comments

Comments
 (0)