92
92
extern crate crossbeam_epoch as epoch;
93
93
extern crate crossbeam_utils as utils;
94
94
95
+ extern crate maybe_uninit;
96
+
95
97
use std:: cell:: { Cell , UnsafeCell } ;
96
98
use std:: cmp;
97
99
use std:: fmt;
98
100
use std:: iter:: FromIterator ;
99
101
use std:: marker:: PhantomData ;
100
- use std:: mem:: { self , ManuallyDrop } ;
102
+ use std:: mem;
101
103
use std:: ptr;
102
104
use std:: sync:: atomic:: { self , AtomicIsize , AtomicPtr , AtomicUsize , Ordering } ;
103
105
use std:: sync:: Arc ;
104
106
105
107
use epoch:: { Atomic , Owned } ;
106
108
use utils:: { Backoff , CachePadded } ;
107
109
110
+ use maybe_uninit:: MaybeUninit ;
111
+
108
112
// Minimum buffer capacity.
109
113
const MIN_CAP : usize = 64 ;
110
114
// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
@@ -218,7 +222,7 @@ impl<T> Drop for Inner<T> {
218
222
// Go through the buffer from front to back and drop all tasks in the queue.
219
223
let mut i = f;
220
224
while i != b {
221
- ptr :: drop_in_place ( buffer. deref ( ) . at ( i) ) ;
225
+ buffer. deref ( ) . at ( i) . drop_in_place ( ) ;
222
226
i = i. wrapping_add ( 1 ) ;
223
227
}
224
228
@@ -1140,7 +1144,7 @@ const HAS_NEXT: usize = 1;
1140
1144
/// A slot in a block.
1141
1145
struct Slot < T > {
1142
1146
/// The task.
1143
- task : UnsafeCell < ManuallyDrop < T > > ,
1147
+ task : UnsafeCell < MaybeUninit < T > > ,
1144
1148
1145
1149
/// The state of the slot.
1146
1150
state : AtomicUsize ,
@@ -1170,7 +1174,13 @@ struct Block<T> {
1170
1174
impl < T > Block < T > {
1171
1175
/// Creates an empty block that starts at `start_index`.
1172
1176
fn new ( ) -> Block < T > {
1173
- unsafe { mem:: zeroed ( ) }
1177
+ // SAFETY: This is safe because:
1178
+ // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1179
+ // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1180
+ // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1181
+ // holds a MaybeUninit.
1182
+ // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1183
+ unsafe { MaybeUninit :: zeroed ( ) . assume_init ( ) }
1174
1184
}
1175
1185
1176
1186
/// Waits until the next pointer is set.
@@ -1329,7 +1339,7 @@ impl<T> Injector<T> {
1329
1339
1330
1340
// Write the task into the slot.
1331
1341
let slot = ( * block) . slots . get_unchecked ( offset) ;
1332
- slot. task . get ( ) . write ( ManuallyDrop :: new ( task) ) ;
1342
+ slot. task . get ( ) . write ( MaybeUninit :: new ( task) ) ;
1333
1343
slot. state . fetch_or ( WRITE , Ordering :: Release ) ;
1334
1344
1335
1345
return ;
@@ -1422,8 +1432,7 @@ impl<T> Injector<T> {
1422
1432
// Read the task.
1423
1433
let slot = ( * block) . slots . get_unchecked ( offset) ;
1424
1434
slot. wait_write ( ) ;
1425
- let m = slot. task . get ( ) . read ( ) ;
1426
- let task = ManuallyDrop :: into_inner ( m) ;
1435
+ let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1427
1436
1428
1437
// Destroy the block if we've reached the end, or if another thread wanted to destroy
1429
1438
// but couldn't because we were busy reading from the slot.
@@ -1548,8 +1557,7 @@ impl<T> Injector<T> {
1548
1557
// Read the task.
1549
1558
let slot = ( * block) . slots . get_unchecked ( offset + i) ;
1550
1559
slot. wait_write ( ) ;
1551
- let m = slot. task . get ( ) . read ( ) ;
1552
- let task = ManuallyDrop :: into_inner ( m) ;
1560
+ let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1553
1561
1554
1562
// Write it into the destination queue.
1555
1563
dest_buffer. write ( dest_b. wrapping_add ( i as isize ) , task) ;
@@ -1561,8 +1569,7 @@ impl<T> Injector<T> {
1561
1569
// Read the task.
1562
1570
let slot = ( * block) . slots . get_unchecked ( offset + i) ;
1563
1571
slot. wait_write ( ) ;
1564
- let m = slot. task . get ( ) . read ( ) ;
1565
- let task = ManuallyDrop :: into_inner ( m) ;
1572
+ let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1566
1573
1567
1574
// Write it into the destination queue.
1568
1575
dest_buffer. write ( dest_b. wrapping_add ( ( batch_size - 1 - i) as isize ) , task) ;
@@ -1704,8 +1711,7 @@ impl<T> Injector<T> {
1704
1711
// Read the task.
1705
1712
let slot = ( * block) . slots . get_unchecked ( offset) ;
1706
1713
slot. wait_write ( ) ;
1707
- let m = slot. task . get ( ) . read ( ) ;
1708
- let task = ManuallyDrop :: into_inner ( m) ;
1714
+ let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1709
1715
1710
1716
match dest. flavor {
1711
1717
Flavor :: Fifo => {
@@ -1714,8 +1720,7 @@ impl<T> Injector<T> {
1714
1720
// Read the task.
1715
1721
let slot = ( * block) . slots . get_unchecked ( offset + i + 1 ) ;
1716
1722
slot. wait_write ( ) ;
1717
- let m = slot. task . get ( ) . read ( ) ;
1718
- let task = ManuallyDrop :: into_inner ( m) ;
1723
+ let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1719
1724
1720
1725
// Write it into the destination queue.
1721
1726
dest_buffer. write ( dest_b. wrapping_add ( i as isize ) , task) ;
@@ -1728,8 +1733,7 @@ impl<T> Injector<T> {
1728
1733
// Read the task.
1729
1734
let slot = ( * block) . slots . get_unchecked ( offset + i + 1 ) ;
1730
1735
slot. wait_write ( ) ;
1731
- let m = slot. task . get ( ) . read ( ) ;
1732
- let task = ManuallyDrop :: into_inner ( m) ;
1736
+ let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1733
1737
1734
1738
// Write it into the destination queue.
1735
1739
dest_buffer. write ( dest_b. wrapping_add ( ( batch_size - 1 - i) as isize ) , task) ;
@@ -1804,7 +1808,8 @@ impl<T> Drop for Injector<T> {
1804
1808
if offset < BLOCK_CAP {
1805
1809
// Drop the task in the slot.
1806
1810
let slot = ( * block) . slots . get_unchecked ( offset) ;
1807
- ManuallyDrop :: drop ( & mut * ( * slot) . task . get ( ) ) ;
1811
+ let p = & mut * slot. task . get ( ) ;
1812
+ p. as_mut_ptr ( ) . drop_in_place ( ) ;
1808
1813
} else {
1809
1814
// Deallocate the block and move to the next one.
1810
1815
let next = ( * block) . next . load ( Ordering :: Relaxed ) ;
0 commit comments