Skip to content

Commit 89e8900

Browse files
authored
feat: Enable global runtime for duckdb (#16)
1 parent f959af8 commit 89e8900

3 files changed

Lines changed: 34 additions & 22 deletions

File tree

rust/lib.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use arrow::array::{Array, RecordBatch, StructArray};
88
use arrow::datatypes::{DataType, Schema};
99
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
1010
use lance::Dataset;
11-
use tokio::runtime::Runtime;
1211

12+
mod runtime;
1313
mod scanner;
1414

1515
use scanner::LanceStream;
@@ -32,14 +32,9 @@ pub unsafe extern "C" fn lance_open_dataset(path: *const c_char) -> *mut c_void
3232
}
3333
};
3434

35-
let runtime = match Runtime::new() {
36-
Ok(rt) => rt,
37-
Err(_) => return ptr::null_mut(),
38-
};
39-
40-
let dataset = match runtime.block_on(Dataset::open(path_str)) {
41-
Ok(ds) => Arc::new(ds),
42-
Err(_) => return ptr::null_mut(),
35+
let dataset = match runtime::block_on(Dataset::open(path_str)) {
36+
Ok(Ok(ds)) => Arc::new(ds),
37+
_ => return ptr::null_mut(),
4338
};
4439

4540
let handle = Box::new(DatasetHandle { dataset });
@@ -110,13 +105,7 @@ pub unsafe extern "C" fn lance_create_stream(dataset: *mut c_void) -> *mut c_voi
110105

111106
let handle = unsafe { &*(dataset as *const DatasetHandle) };
112107

113-
// Create a new runtime for this stream to avoid conflicts
114-
let runtime = match Runtime::new() {
115-
Ok(rt) => rt,
116-
Err(_) => return ptr::null_mut(),
117-
};
118-
119-
match LanceStream::new(&handle.dataset, runtime) {
108+
match LanceStream::new(&handle.dataset) {
120109
Ok(stream) => Box::into_raw(Box::new(stream)) as *mut c_void,
121110
Err(_) => ptr::null_mut(),
122111
}

rust/runtime.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::future::Future;
2+
use std::io;
3+
use std::sync::OnceLock;
4+
5+
use tokio::runtime::{Handle, Runtime};
6+
7+
static RUNTIME: OnceLock<Result<Runtime, io::Error>> = OnceLock::new();
8+
9+
pub fn runtime() -> Result<&'static Runtime, io::Error> {
10+
match RUNTIME.get_or_init(Runtime::new) {
11+
Ok(rt) => Ok(rt),
12+
Err(err) => Err(io::Error::new(err.kind(), err.to_string())),
13+
}
14+
}
15+
16+
pub fn handle() -> Result<Handle, io::Error> {
17+
Ok(runtime()?.handle().clone())
18+
}
19+
20+
pub fn block_on<F: Future>(future: F) -> Result<F::Output, io::Error> {
21+
Ok(runtime()?.block_on(future))
22+
}

rust/scanner.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,24 @@ use std::pin::Pin;
33
use arrow::array::RecordBatch;
44
use futures::stream::Stream;
55
use lance::Dataset;
6-
use tokio::runtime::Runtime;
6+
use tokio::runtime::Handle;
77

88
/// A stream wrapper that holds the Lance RecordBatchStream
99
pub struct LanceStream {
10-
runtime: Runtime,
10+
handle: Handle,
1111
stream: Pin<Box<dyn Stream<Item = Result<RecordBatch, lance::Error>> + Send>>,
1212
}
1313

1414
impl LanceStream {
1515
/// Create a new stream from a dataset path
16-
pub fn new(dataset: &Dataset, runtime: Runtime) -> Result<Self, Box<dyn std::error::Error>> {
16+
pub fn new(dataset: &Dataset) -> Result<Self, Box<dyn std::error::Error>> {
17+
let handle = crate::runtime::handle()?;
1718
let scanner = dataset.scan();
1819

19-
let stream = runtime.block_on(async { scanner.try_into_stream().await })?;
20+
let stream = handle.block_on(async { scanner.try_into_stream().await })?;
2021

2122
Ok(Self {
22-
runtime,
23+
handle,
2324
stream: Box::pin(stream),
2425
})
2526
}
@@ -28,7 +29,7 @@ impl LanceStream {
2829
pub fn next(&mut self) -> Option<RecordBatch> {
2930
use futures::StreamExt;
3031

31-
self.runtime.block_on(async {
32+
self.handle.block_on(async {
3233
match self.stream.next().await {
3334
Some(Ok(batch)) => Some(batch),
3435
_ => None,

0 commit comments

Comments
 (0)