Skip to content

Commit 96950b8

Browse files
authored
Use tokio::sync::oneshot to prevent FuturesUnordered reentrant drop crash (#9)
1 parent c2899d2 commit 96950b8

5 files changed

Lines changed: 31 additions & 0 deletions

File tree

vortex-io/src/file/driver.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ use std::pin::Pin;
55
use std::task::{Context, Poll};
66

77
use futures::Stream;
8+
#[cfg(all(test, not(feature = "tokio")))]
9+
use oneshot;
810
use pin_project_lite::pin_project;
11+
// Prefer tokio::sync::oneshot when tokio feature is enabled
12+
#[cfg(all(test, feature = "tokio"))]
13+
use tokio::sync::oneshot;
914
use vortex_error::VortexExpect;
1015
use vortex_metrics::VortexMetrics;
1116

vortex-io/src/file/read/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@ use async_trait::async_trait;
1515
use futures::channel::mpsc;
1616
use futures::future::{BoxFuture, Shared};
1717
use futures::{FutureExt, TryFutureExt};
18+
#[cfg(not(feature = "tokio"))]
19+
use oneshot;
1820
pub use request::*;
1921
pub use source::*;
22+
// Prefer tokio::sync::oneshot when tokio feature is enabled
23+
#[cfg(feature = "tokio")]
24+
use tokio::sync::oneshot;
2025
use vortex_buffer::{Alignment, ByteBuffer};
2126
use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_err};
2227

vortex-io/src/file/read/request.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ use std::fmt::{Debug, Formatter};
66
use std::ops::Range;
77
use std::sync::Arc;
88

9+
#[cfg(not(feature = "tokio"))]
10+
use oneshot;
11+
// Prefer tokio::sync::oneshot when tokio feature is enabled
12+
#[cfg(feature = "tokio")]
13+
use tokio::sync::oneshot;
914
use vortex_buffer::{Alignment, ByteBuffer};
1015
use vortex_error::{VortexError, VortexExpect, VortexResult};
1116

@@ -118,6 +123,12 @@ impl Debug for ReadRequest {
118123

119124
impl ReadRequest {
120125
pub(crate) fn resolve(self, result: VortexResult<ByteBuffer>) {
126+
// tokio::sync::oneshot::Sender::send returns Err with the value if receiver dropped
127+
#[cfg(feature = "tokio")]
128+
if self.callback.send(result).is_err() {
129+
log::debug!("ReadRequest {} dropped before resolving", self.id);
130+
}
131+
#[cfg(not(feature = "tokio"))]
121132
if let Err(e) = self.callback.send(result) {
122133
log::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
123134
}

vortex-io/src/runtime/handle.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ use std::task::{Context, Poll, ready};
77

88
use futures::channel::mpsc;
99
use futures::{FutureExt, StreamExt};
10+
#[cfg(not(feature = "tokio"))]
11+
use oneshot;
12+
// Prefer tokio::sync::oneshot when tokio feature is enabled
13+
#[cfg(feature = "tokio")]
14+
use tokio::sync::oneshot;
1015
use vortex_error::{VortexResult, vortex_panic};
1116
use vortex_metrics::VortexMetrics;
1217

vortex-io/src/runtime/single.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@ use std::sync::Arc;
77
use futures::future::BoxFuture;
88
use futures::stream::LocalBoxStream;
99
use futures::{Stream, StreamExt};
10+
#[cfg(not(feature = "tokio"))]
11+
use oneshot;
1012
use parking_lot::Mutex;
1113
use smol::LocalExecutor;
14+
// Prefer tokio::sync::oneshot when tokio feature is enabled
15+
#[cfg(feature = "tokio")]
16+
use tokio::sync::oneshot;
1217
use vortex_error::vortex_panic;
1318

1419
use crate::runtime::smol::SmolAbortHandle;

0 commit comments

Comments
 (0)