Skip to content

Commit f09931d

Browse files
committed
feat: Replace MaybeUninit with ManuallyDrop for send operations
Replaced with in channel send paths. This change addresses Miri data race errors that occurred due to how was being used in scenarios involving values being semantically moved into the channel. By using , we gain explicit control over the dropping of values, preventing from potentially being perceived as uninitialized memory in concurrent contexts by Miri. This refactoring impacts: - : , , and various send methods. - : and public send methods. - : trait signature and macro. - All implementations: and . - : function. - : implementation for the Null channel. - : . All usages were replaced with , and logic was adjusted to use or for value transfer, ensuring proper ownership semantics and avoiding unintended drops. Unused imports were also cleaned up.
1 parent 88d15ad commit f09931d

File tree

16 files changed

+82
-82
lines changed

16 files changed

+82
-82
lines changed

src/async_tx.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use std::cell::Cell;
77
use std::fmt;
88
use std::future::Future;
99
use std::marker::PhantomData;
10-
use std::mem::{needs_drop, MaybeUninit};
10+
use std::mem::{needs_drop, ManuallyDrop};
1111
use std::ops::Deref;
1212
use std::pin::Pin;
13+
use std::ptr;
1314
use std::sync::Arc;
1415
use std::task::{Context, Poll};
1516

@@ -123,7 +124,7 @@ impl<F: Flavor> AsyncTx<F> {
123124
/// Returns Err([SendError]) if the receiver has been dropped.
124125
#[inline(always)]
125126
pub fn send<'a>(&'a self, item: F::Item) -> SendFuture<'a, F> {
126-
return SendFuture { tx: &self, item: MaybeUninit::new(item), waker: None };
127+
return SendFuture { tx: &self, item: ManuallyDrop::new(item), waker: None };
127128
}
128129

129130
/// Attempts to send a message without blocking.
@@ -138,12 +139,12 @@ impl<F: Flavor> AsyncTx<F> {
138139
if self.shared.is_rx_closed() {
139140
return Err(TrySendError::Disconnected(item));
140141
}
141-
let _item = MaybeUninit::new(item);
142+
let _item = ManuallyDrop::new(item);
142143
if self.shared.inner.try_send(&_item) {
143144
self.shared.on_send();
144145
return Ok(());
145146
} else {
146-
return unsafe { Err(TrySendError::Full(_item.assume_init())) };
147+
return Err(TrySendError::Full(ManuallyDrop::into_inner(_item)));
147148
}
148149
}
149150

@@ -222,7 +223,7 @@ impl<F: Flavor> AsyncTx<F> {
222223
{
223224
SendTimeoutFuture {
224225
tx: self,
225-
item: MaybeUninit::new(item),
226+
item: ManuallyDrop::new(item),
226227
waker: None,
227228
sleep: Box::pin(fut),
228229
}
@@ -237,7 +238,7 @@ impl<F: Flavor> AsyncTx<F> {
237238
/// Returns `Poll::Ready(Err(())` when all Rx dropped.
238239
#[inline(always)]
239240
pub(crate) fn poll_send<'a, const SINK: bool>(
240-
&self, ctx: &'a mut Context, item: &MaybeUninit<F::Item>,
241+
&self, ctx: &'a mut Context, item: &ManuallyDrop<F::Item>,
241242
o_waker: &'a mut Option<<F::Send as Registry>::Waker>,
242243
) -> Poll<Result<(), ()>> {
243244
let shared = &self.shared;
@@ -304,7 +305,7 @@ impl<F: Flavor> AsyncTx<F> {
304305
#[must_use]
305306
pub struct SendFuture<'a, F: Flavor> {
306307
tx: &'a AsyncTx<F>,
307-
item: MaybeUninit<F::Item>,
308+
item: ManuallyDrop<F::Item>,
308309
waker: Option<<F::Send as Registry>::Waker>,
309310
}
310311

@@ -317,7 +318,7 @@ impl<F: Flavor> Drop for SendFuture<'_, F> {
317318
if let Some(waker) = self.waker.as_ref() {
318319
if self.tx.shared.abandon_send_waker(waker) {
319320
if needs_drop::<F::Item>() {
320-
unsafe { self.item.assume_init_drop() };
321+
unsafe { ManuallyDrop::drop(&mut self.item) };
321322
}
322323
}
323324
}
@@ -337,7 +338,7 @@ impl<F: Flavor> Future for SendFuture<'_, F> {
337338
}
338339
Poll::Ready(Err(())) => {
339340
let _ = _self.waker.take();
340-
return Poll::Ready(Err(SendError(unsafe { _self.item.assume_init_read() })));
341+
return Poll::Ready(Err(SendError(unsafe { ptr::read(&*_self.item) })));
341342
}
342343
Poll::Pending => return Poll::Pending,
343344
}
@@ -349,7 +350,7 @@ impl<F: Flavor> Future for SendFuture<'_, F> {
349350
pub struct SendTimeoutFuture<'a, F: Flavor, R> {
350351
tx: &'a AsyncTx<F>,
351352
sleep: Pin<Box<dyn Future<Output = R>>>,
352-
item: MaybeUninit<F::Item>,
353+
item: ManuallyDrop<F::Item>,
353354
waker: Option<<F::Send as Registry>::Waker>,
354355
}
355356

@@ -362,7 +363,7 @@ impl<F: Flavor, R> Drop for SendTimeoutFuture<'_, F, R> {
362363
// Cancelling the future, poll is not ready
363364
if self.tx.shared.abandon_send_waker(waker) {
364365
if needs_drop::<F::Item>() {
365-
unsafe { self.item.assume_init_drop() };
366+
unsafe { ManuallyDrop::drop(&mut self.item) };
366367
}
367368
}
368369
}
@@ -383,14 +384,14 @@ impl<F: Flavor, R> Future for SendTimeoutFuture<'_, F, R> {
383384
Poll::Ready(Err(())) => {
384385
let _ = _self.waker.take();
385386
return Poll::Ready(Err(SendTimeoutError::Disconnected(unsafe {
386-
_self.item.assume_init_read()
387+
ptr::read(&*_self.item)
387388
})));
388389
}
389390
Poll::Pending => {
390391
if let Poll::Ready(_) = _self.sleep.as_mut().poll(ctx) {
391392
if _self.tx.shared.abandon_send_waker(&_self.waker.take().unwrap()) {
392393
return Poll::Ready(Err(SendTimeoutError::Timeout(unsafe {
393-
_self.item.assume_init_read()
394+
ptr::read(&*_self.item)
394395
})));
395396
} else {
396397
// Message already sent in background (on_recv).

src/blocking_tx.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{shared::*, trace_log, AsyncTx, MAsyncTx, NotCloneable, SenderType};
44
use std::cell::Cell;
55
use std::fmt;
66
use std::marker::PhantomData;
7-
use std::mem::MaybeUninit;
7+
use std::mem::ManuallyDrop;
88
use std::ops::Deref;
99
use std::sync::atomic::Ordering;
1010
use std::sync::Arc;
@@ -81,7 +81,7 @@ impl<F: Flavor> From<AsyncTx<F>> for Tx<F> {
8181
impl<F: Flavor> Tx<F> {
8282
#[inline(always)]
8383
pub(crate) fn _send_bounded(
84-
&self, item: &MaybeUninit<F::Item>, deadline: Option<Instant>,
84+
&self, item: &ManuallyDrop<F::Item>, deadline: Option<Instant>,
8585
) -> Result<(), SendTimeoutError<F::Item>> {
8686
let shared = &self.shared;
8787
let large = shared.large;
@@ -94,7 +94,7 @@ impl<F: Flavor> Tx<F> {
9494
loop {
9595
let r = if large { backoff.yield_now() } else { backoff.spin() };
9696
if direct_copy && large {
97-
match shared.inner.try_send_oneshot(item.as_ptr()) {
97+
match shared.inner.try_send_oneshot(&**item as *const F::Item) {
9898
Some(false) => break,
9999
None => {
100100
if r {
@@ -122,7 +122,7 @@ impl<F: Flavor> Tx<F> {
122122
}
123123
}
124124
let direct_copy_ptr: *const F::Item =
125-
if direct_copy { item.as_ptr() } else { std::ptr::null() };
125+
if direct_copy { &**item as *const F::Item } else { std::ptr::null() };
126126

127127
let mut state: u8;
128128
let mut o_waker: Option<<F::Send as Registry>::Waker> = None;
@@ -159,7 +159,7 @@ impl<F: Flavor> Tx<F> {
159159
Err(_) => {
160160
if shared.abandon_send_waker(o_waker.as_ref().unwrap()) {
161161
return Err(SendTimeoutError::Timeout(unsafe {
162-
item.assume_init_read()
162+
std::ptr::read(&**item)
163163
}));
164164
} else {
165165
// NOTE: Unlikely since we disable direct copy with deadline
@@ -188,7 +188,7 @@ impl<F: Flavor> Tx<F> {
188188
return_ok!();
189189
} else {
190190
debug_assert_eq!(state, WakerState::Closed as u8);
191-
return Err(SendTimeoutError::Disconnected(unsafe { item.assume_init_read() }));
191+
return Err(SendTimeoutError::Disconnected(unsafe { std::ptr::read(&**item) }));
192192
}
193193
}
194194
}
@@ -205,7 +205,7 @@ impl<F: Flavor> Tx<F> {
205205
if shared.is_rx_closed() {
206206
return Err(SendError(item));
207207
}
208-
let _item = MaybeUninit::new(item);
208+
let _item = ManuallyDrop::new(item);
209209
if shared.inner.try_send(&_item) {
210210
shared.on_send();
211211
return Ok(());
@@ -230,12 +230,12 @@ impl<F: Flavor> Tx<F> {
230230
if shared.is_rx_closed() {
231231
return Err(TrySendError::Disconnected(item));
232232
}
233-
let _item = MaybeUninit::new(item);
233+
let _item = ManuallyDrop::new(item);
234234
if shared.inner.try_send(&_item) {
235235
shared.on_send();
236236
return Ok(());
237237
} else {
238-
return Err(TrySendError::Full(unsafe { _item.assume_init_read() }));
238+
return Err(TrySendError::Full(ManuallyDrop::into_inner(_item)));
239239
}
240240
}
241241

@@ -263,7 +263,7 @@ impl<F: Flavor> Tx<F> {
263263
TrySendError::Full(t) => SendTimeoutError::Timeout(t),
264264
}),
265265
Some(deadline) => {
266-
let _item = MaybeUninit::new(item);
266+
let _item = ManuallyDrop::new(item);
267267
if shared.inner.try_send(&_item) {
268268
shared.on_send();
269269
return Ok(());

src/flavor/array.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{FlavorBounded, FlavorImpl, FlavorSelect, Queue, Token};
22
use crate::crossbeam::array_queue::ArrayQueue;
3-
use std::mem::MaybeUninit;
3+
use std::mem::ManuallyDrop;
44

55
/// Which Equals to crossbeam_queue::ArrayQueue
66
pub type Array<T> = _Array<T, true, true>;
@@ -27,11 +27,11 @@ impl<T: Send + 'static + Unpin, const MP: bool, const MC: bool> Queue for _Array
2727

2828
#[inline(always)]
2929
fn push(&self, item: T) -> Result<(), T> {
30-
let _item = MaybeUninit::new(item);
31-
if unsafe { self.0.push_with_ptr(_item.as_ptr()) } {
30+
let _item = ManuallyDrop::new(item);
31+
if unsafe { self.0.push_with_ptr(&*_item as *const T) } {
3232
Ok(())
3333
} else {
34-
Err(unsafe { _item.assume_init_read() })
34+
Err(ManuallyDrop::into_inner(_item))
3535
}
3636
}
3737

@@ -58,8 +58,8 @@ impl<T: Send + 'static + Unpin, const MP: bool, const MC: bool> Queue for _Array
5858

5959
impl<T: Send + 'static + Unpin, const MP: bool, const MC: bool> FlavorImpl for _Array<T, MP, MC> {
6060
#[inline(always)]
61-
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
62-
return unsafe { self.0.push_with_ptr(item.as_ptr()) };
61+
fn try_send(&self, item: &ManuallyDrop<T>) -> bool {
62+
return unsafe { self.0.push_with_ptr(&**item as *const T) };
6363
}
6464

6565
#[inline(always)]

src/flavor/array_mpsc.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{FlavorBounded, FlavorImpl, FlavorSelect, Queue, Token};
22
use crate::crossbeam::array_queue_mpsc::ArrayQueueMpsc;
3-
use std::mem::MaybeUninit;
3+
use std::mem::ManuallyDrop;
44

55
/// Simplified ArrayQueue tweaks for mpsc
66
pub struct ArrayMpsc<T>(ArrayQueueMpsc<T>);
@@ -25,11 +25,11 @@ impl<T: Send + 'static + Unpin> Queue for ArrayMpsc<T> {
2525

2626
#[inline(always)]
2727
fn push(&self, item: T) -> Result<(), T> {
28-
let _item = MaybeUninit::new(item);
29-
if unsafe { self.0.push_with_ptr(_item.as_ptr()) } {
28+
let _item = ManuallyDrop::new(item);
29+
if unsafe { self.0.push_with_ptr(&*_item as *const T) } {
3030
Ok(())
3131
} else {
32-
Err(unsafe { _item.assume_init_read() })
32+
Err(ManuallyDrop::into_inner(_item))
3333
}
3434
}
3535

@@ -56,8 +56,8 @@ impl<T: Send + 'static + Unpin> Queue for ArrayMpsc<T> {
5656

5757
impl<T: Send + 'static + Unpin> FlavorImpl for ArrayMpsc<T> {
5858
#[inline(always)]
59-
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
60-
return unsafe { self.0.push_with_ptr(item.as_ptr()) };
59+
fn try_send(&self, item: &ManuallyDrop<T>) -> bool {
60+
return unsafe { self.0.push_with_ptr(&**item as *const T) };
6161
}
6262

6363
#[inline(always)]

src/flavor/array_spsc.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{FlavorBounded, FlavorImpl, FlavorSelect, Queue, Token};
22
use crate::crossbeam::array_queue_spsc::ArrayQueueSpsc;
3-
use std::mem::MaybeUninit;
3+
use std::mem::ManuallyDrop;
44

55
/// Simplified ArrayQueue tweaks for spsc, without stamp
66
pub struct ArraySpsc<T>(ArrayQueueSpsc<T>);
@@ -25,11 +25,11 @@ impl<T: Send + 'static + Unpin> Queue for ArraySpsc<T> {
2525

2626
#[inline(always)]
2727
fn push(&self, item: T) -> Result<(), T> {
28-
let _item = MaybeUninit::new(item);
29-
if unsafe { self.0.push_with_ptr_final(_item.as_ptr()) } {
28+
let _item = ManuallyDrop::new(item);
29+
if unsafe { self.0.push_with_ptr_final(&*_item as *const T) } {
3030
Ok(())
3131
} else {
32-
Err(unsafe { _item.assume_init_read() })
32+
Err(ManuallyDrop::into_inner(_item))
3333
}
3434
}
3535

@@ -56,8 +56,8 @@ impl<T: Send + 'static + Unpin> Queue for ArraySpsc<T> {
5656

5757
impl<T: Send + 'static + Unpin> FlavorImpl for ArraySpsc<T> {
5858
#[inline(always)]
59-
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
60-
return unsafe { self.0.push_with_ptr(item.as_ptr()) };
59+
fn try_send(&self, item: &ManuallyDrop<T>) -> bool {
60+
return unsafe { self.0.push_with_ptr(&**item as *const T) };
6161
}
6262

6363
#[inline(always)]

src/flavor/list.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
22
use crate::crossbeam::seg_queue::SegQueue;
3-
use std::mem::MaybeUninit;
3+
use std::mem::ManuallyDrop;
44

55
/// Which equals to crossbeam_queue::SeqQueue
66
pub struct List<T>(SegQueue<T>);
@@ -49,8 +49,8 @@ impl<T: Send + Unpin + 'static> Queue for List<T> {
4949

5050
impl<T: Send + Unpin + 'static> FlavorImpl for List<T> {
5151
#[inline(always)]
52-
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
53-
self.0.push(unsafe { item.assume_init_read() });
52+
fn try_send(&self, item: &ManuallyDrop<T>) -> bool {
53+
self.0.push(unsafe { std::ptr::read(&**item) });
5454
true
5555
}
5656

src/flavor/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::waker_registry::*;
22
use std::marker::PhantomData;
3-
use std::mem::MaybeUninit;
3+
use std::mem::ManuallyDrop;
44
use std::ops::Deref;
55

66
pub mod array;
@@ -58,7 +58,7 @@ pub trait Queue: Send + 'static {
5858

5959
/// Internal flavor interface
6060
pub(crate) trait FlavorImpl: Send + 'static + Queue {
61-
fn try_send(&self, item: &MaybeUninit<Self::Item>) -> bool;
61+
fn try_send(&self, item: &ManuallyDrop<Self::Item>) -> bool;
6262

6363
#[inline]
6464
fn try_send_oneshot(&self, _item: *const Self::Item) -> Option<bool> {
@@ -125,7 +125,7 @@ pub(super) use queue_dispatch;
125125
macro_rules! flavor_dispatch {
126126
($wrap_method: ident)=>{
127127
#[inline(always)]
128-
fn try_send(&self, item: &MaybeUninit<Self::Item>) -> bool {
128+
fn try_send(&self, item: &std::mem::ManuallyDrop<Self::Item>) -> bool {
129129
$wrap_method!(self, try_send item)
130130
}
131131

src/flavor/one.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
22
use crate::backoff::*;
33
use core::cell::UnsafeCell;
4-
use core::mem::{needs_drop, MaybeUninit};
4+
use core::mem::{needs_drop, ManuallyDrop, MaybeUninit};
55
use core::ptr;
66
use core::sync::atomic::{
77
compiler_fence, AtomicU16, AtomicU32,
@@ -36,11 +36,11 @@ impl<T: Send + Unpin + 'static> Queue for One<T> {
3636

3737
#[inline(always)]
3838
fn push(&self, item: T) -> Result<(), T> {
39-
let _item = MaybeUninit::new(item);
40-
if unsafe { self._try_push(SeqCst, _item.as_ptr(), Acquire).is_ok() } {
39+
let _item = ManuallyDrop::new(item);
40+
if unsafe { self._try_push(SeqCst, &*_item as *const T, Acquire).is_ok() } {
4141
Ok(())
4242
} else {
43-
Err(unsafe { _item.assume_init_read() })
43+
Err(ManuallyDrop::into_inner(_item))
4444
}
4545
}
4646

@@ -217,9 +217,9 @@ impl<T> Drop for One<T> {
217217

218218
impl<T: Send + 'static + Unpin> FlavorImpl for One<T> {
219219
#[inline(always)]
220-
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
220+
fn try_send(&self, item: &ManuallyDrop<T>) -> bool {
221221
// Will always double-check with is_full or try_send_oneshot()
222-
unsafe { self._try_push(Relaxed, item.as_ptr(), Relaxed).is_ok() }
222+
unsafe { self._try_push(Relaxed, &**item as *const T, Relaxed).is_ok() }
223223
}
224224

225225
#[inline(always)]

0 commit comments

Comments
 (0)