Skip to content

Commit 3632215

Browse files
sgrebnovlukekim
authored andcommitted
Use tokio::sync::oneshot to prevent FuturesUnordered reentrant drop crash (#9)
1 parent 5e66029 commit 3632215

9 files changed

Lines changed: 36 additions & 15 deletions

File tree

vortex-array/src/arrow/convert.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1797,6 +1797,8 @@ mod tests {
17971797
use arrow_array::MapArray;
17981798
use arrow_array::builder::MapBuilder;
17991799
use arrow_array::builder::StringBuilder;
1800+
use crate::arrow::executor::ArrowArrayExecutor as _;
1801+
use crate::executor::VortexSessionExecute as _;
18001802

18011803
// Build a MapArray: map<string, int32>
18021804
let mut builder = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
@@ -1826,12 +1828,12 @@ mod tests {
18261828
let list_array = vortex_array.as_::<ListVTable>();
18271829
assert_eq!(list_array.elements().len(), 3); // 3 total key-value pairs
18281830
let struct_elements = list_array.elements().as_::<StructVTable>();
1829-
assert_eq!(struct_elements.nfields(), 2); // key and value fields
1831+
assert_eq!(struct_elements.names().len(), 2); // key and value fields
18301832

18311833
// Convert back to Arrow as a MapArray
18321834
let map_dtype = arrow_map.data_type().clone();
18331835
let arrow_back = vortex_array
1834-
.execute_arrow(Some(&map_dtype), &mut crate::session::LEGACY_SESSION.create_execution_ctx())
1836+
.execute_arrow(Some(&map_dtype), &mut crate::LEGACY_SESSION.create_execution_ctx())
18351837
.unwrap();
18361838
let map_back = arrow_back
18371839
.as_any()

vortex-array/src/expr/exprs/case_when.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::compute::zip;
3737
use crate::expr::Arity;
3838
use crate::expr::ChildName;
3939
use crate::expr::ExecutionArgs;
40-
use crate::expr::ExecutionResult;
40+
4141
use crate::expr::ExprId;
4242
use crate::expr::VTable;
4343
use crate::expr::VTableExt;
@@ -168,7 +168,7 @@ impl VTable for CaseWhen {
168168
&self,
169169
options: &Self::Options,
170170
args: ExecutionArgs,
171-
) -> VortexResult<ExecutionResult> {
171+
) -> VortexResult<ArrayRef> {
172172
let row_count = args.row_count;
173173
let num_pairs = options.num_when_then_pairs as usize;
174174

@@ -222,7 +222,7 @@ impl VTable for CaseWhen {
222222
result = zip(then_value.as_ref(), result.as_ref(), &mask)?;
223223
}
224224

225-
result.execute::<ExecutionResult>(args.ctx)
225+
result.execute::<ArrayRef>(args.ctx)
226226
}
227227

228228
fn is_null_sensitive(&self, _options: &Self::Options) -> bool {
@@ -236,7 +236,7 @@ impl VTable for CaseWhen {
236236
}
237237

238238
/// Efficient implementation for binary CASE WHEN (single when/then pair)
239-
fn execute_binary_case_when(_has_else: bool, args: ExecutionArgs) -> VortexResult<ExecutionResult> {
239+
fn execute_binary_case_when(_has_else: bool, args: ExecutionArgs) -> VortexResult<ArrayRef> {
240240
let row_count = args.row_count;
241241

242242
// Extract inputs based on arity: [condition, then_value] or [condition, then_value, else_value]
@@ -265,20 +265,20 @@ fn execute_binary_case_when(_has_else: bool, args: ExecutionArgs) -> VortexResul
265265

266266
// Short-circuit: all true -> just return THEN value
267267
if mask.all_true() {
268-
return then_value.execute::<ExecutionResult>(args.ctx);
268+
return then_value.execute::<ArrayRef>(args.ctx);
269269
}
270270

271271
// Short-circuit: all false -> return ELSE value or NULL
272272
if mask.all_false() {
273273
return match else_value {
274-
Some(else_value) => else_value.execute::<ExecutionResult>(args.ctx),
274+
Some(else_value) => else_value.execute::<ArrayRef>(args.ctx),
275275
None => {
276276
// Create NULL constant of appropriate type
277277
let then_dtype = then_value.dtype().as_nullable();
278-
Ok(ExecutionResult::constant(
278+
Ok(ConstantArray::new(
279279
Scalar::null(then_dtype),
280280
row_count,
281-
))
281+
).into_array())
282282
}
283283
};
284284
}
@@ -292,7 +292,7 @@ fn execute_binary_case_when(_has_else: bool, args: ExecutionArgs) -> VortexResul
292292
// Use zip to select: where mask is true, take then_value; else take else_value
293293
let result = zip(then_value.as_ref(), else_value.as_ref(), &mask)?;
294294

295-
result.execute::<ExecutionResult>(args.ctx)
295+
result.execute::<ArrayRef>(args.ctx)
296296
}
297297

298298
/// Creates an N-ary CASE WHEN expression from a flat list of children.

vortex-btrblocks/src/canonical_compressor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub trait CanonicalCompressor {
8888
/// .exclude_int([IntCode::Dict])
8989
/// .build();
9090
/// ```
91-
#[derive(Clone)]
91+
#[derive(Clone, Debug)]
9292
pub struct BtrBlocksCompressor {
9393
/// Integer compressor with configured schemes.
9494
pub int_schemes: Vec<&'static dyn IntegerScheme>,

vortex-datafusion/src/convert/exprs.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use vortex::expr::Expression;
2828
use vortex::expr::Like;
2929
use vortex::expr::Operator;
3030
use vortex::expr::VTableExt;
31-
use vortex::expr::and;
3231
use vortex::expr::case_when;
3332
use vortex::expr::case_when_no_else;
3433
use vortex::expr::and_collect;

vortex-file/src/read/driver.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@ use std::task::Context;
99
use std::task::Poll;
1010

1111
use futures::Stream;
12+
#[cfg(all(test, not(feature = "tokio")))]
13+
use oneshot;
1214
use pin_project_lite::pin_project;
1315
use vortex_buffer::Alignment;
16+
#[cfg(all(test, feature = "tokio"))]
17+
use tokio::sync::oneshot;
1418
use vortex_error::VortexExpect;
1519
use vortex_io::CoalesceConfig;
1620
use vortex_metrics::Counter;

vortex-file/src/read/request.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ use std::ops::Range;
88
use std::sync::Arc;
99

1010
use vortex_array::buffer::BufferHandle;
11+
#[cfg(not(feature = "tokio"))]
12+
use oneshot;
13+
#[cfg(feature = "tokio")]
14+
use tokio::sync::oneshot;
1115
use vortex_buffer::Alignment;
1216
use vortex_error::VortexError;
1317
use vortex_error::VortexExpect;
@@ -108,7 +112,7 @@ impl Debug for ReadRequest {
108112
impl ReadRequest {
109113
pub(crate) fn resolve(self, result: VortexResult<BufferHandle>) {
110114
if let Err(e) = self.callback.send(result) {
111-
tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
115+
tracing::debug!("ReadRequest {} dropped before resolving: {e:?}", self.id);
112116
}
113117
}
114118
}

vortex-file/src/segments/source.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ use futures::FutureExt;
1313
use futures::StreamExt;
1414
use futures::channel::mpsc;
1515
use futures::future;
16+
#[cfg(not(feature = "tokio"))]
17+
use oneshot;
18+
#[cfg(feature = "tokio")]
19+
use tokio::sync::oneshot;
1620
use vortex_array::buffer::BufferHandle;
1721
use vortex_buffer::Alignment;
1822
use vortex_error::VortexResult;

vortex-io/src/runtime/handle.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ use std::task::Poll;
99
use std::task::ready;
1010

1111
use futures::FutureExt;
12+
#[cfg(not(feature = "tokio"))]
13+
use oneshot;
14+
#[cfg(feature = "tokio")]
15+
use tokio::sync::oneshot;
1216
use vortex_error::vortex_panic;
1317

1418
use crate::runtime::AbortHandleRef;

vortex-io/src/runtime/single.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,13 @@ use futures::Stream;
88
use futures::StreamExt;
99
use futures::future::BoxFuture;
1010
use futures::stream::LocalBoxStream;
11+
#[cfg(not(feature = "tokio"))]
12+
use oneshot;
1113
use parking_lot::Mutex;
1214
use smol::LocalExecutor;
15+
// Prefer tokio::sync::oneshot when tokio feature is enabled
16+
#[cfg(feature = "tokio")]
17+
use tokio::sync::oneshot;
1318
use vortex_error::vortex_panic;
1419

1520
use crate::runtime::AbortHandle;
@@ -18,7 +23,6 @@ use crate::runtime::BlockingRuntime;
1823
use crate::runtime::Executor;
1924
use crate::runtime::Handle;
2025
use crate::runtime::smol::SmolAbortHandle;
21-
2226
/// A runtime that drives all work on the current thread.
2327
///
2428
/// This is subtly different from using a current-thread runtime to drive a future since it is

0 commit comments

Comments
 (0)