@@ -53,6 +53,8 @@ impl<T> Buffer<T> {
53
53
/// Returns a pointer to the task at the specified `index`.
54
54
unsafe fn at ( & self , index : isize ) -> * mut T {
55
55
// `self.cap` is always a power of two.
56
+ // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
57
+ // don't actually have the right to access this memory.
56
58
self . ptr . offset ( index & ( self . cap - 1 ) as isize )
57
59
}
58
60
@@ -62,18 +64,18 @@ impl<T> Buffer<T> {
62
64
/// technically speaking a data race and therefore UB. We should use an atomic store here, but
63
65
/// that would be more expensive and difficult to implement generically for all types `T`.
64
66
/// Hence, as a hack, we use a volatile write instead.
65
- unsafe fn write ( & self , index : isize , task : T ) {
66
- ptr:: write_volatile ( self . at ( index) , task)
67
+ unsafe fn write ( & self , index : isize , task : MaybeUninit < T > ) {
68
+ ptr:: write_volatile ( self . at ( index) as * mut MaybeUninit < T > , task)
67
69
}
68
70
69
71
/// Reads a task from the specified `index`.
70
72
///
71
73
/// This method might be concurrently called with another `write` at the same index, which is
72
74
/// technically speaking a data race and therefore UB. We should use an atomic load here, but
73
75
/// that would be more expensive and difficult to implement generically for all types `T`.
74
- /// Hence, as a hack, we use a volatile write instead.
75
- unsafe fn read ( & self , index : isize ) -> T {
76
- ptr:: read_volatile ( self . at ( index) )
76
+ /// Hence, as a hack, we use a volatile load instead.
77
+ unsafe fn read ( & self , index : isize ) -> MaybeUninit < T > {
78
+ ptr:: read_volatile ( self . at ( index) as * mut MaybeUninit < T > )
77
79
}
78
80
}
79
81
@@ -406,7 +408,7 @@ impl<T> Worker<T> {
406
408
407
409
// Write `task` into the slot.
408
410
unsafe {
409
- buffer. write ( b, task) ;
411
+ buffer. write ( b, MaybeUninit :: new ( task) ) ;
410
412
}
411
413
412
414
atomic:: fence ( Ordering :: Release ) ;
@@ -461,7 +463,7 @@ impl<T> Worker<T> {
461
463
unsafe {
462
464
// Read the popped task.
463
465
let buffer = self . buffer . get ( ) ;
464
- let task = buffer. read ( f) ;
466
+ let task = buffer. read ( f) . assume_init ( ) ;
465
467
466
468
// Shrink the buffer if `len - 1` is less than one fourth of the capacity.
467
469
if buffer. cap > MIN_CAP && len <= buffer. cap as isize / 4 {
@@ -509,8 +511,8 @@ impl<T> Worker<T> {
509
511
)
510
512
. is_err ( )
511
513
{
512
- // Failed. We didn't pop anything.
513
- mem :: forget ( task. take ( ) ) ;
514
+ // Failed. We didn't pop anything. Reset to `None`.
515
+ task. take ( ) ;
514
516
}
515
517
516
518
// Restore the back index to the original task.
@@ -524,7 +526,7 @@ impl<T> Worker<T> {
524
526
}
525
527
}
526
528
527
- task
529
+ task. map ( |t| unsafe { t . assume_init ( ) } )
528
530
}
529
531
}
530
532
}
@@ -661,12 +663,11 @@ impl<T> Stealer<T> {
661
663
. is_err ( )
662
664
{
663
665
// We didn't steal this task, forget it.
664
- mem:: forget ( task) ;
665
666
return Steal :: Retry ;
666
667
}
667
668
668
669
// Return the stolen task.
669
- Steal :: Success ( task)
670
+ Steal :: Success ( unsafe { task. assume_init ( ) } )
670
671
}
671
672
672
673
/// Steals a batch of tasks and pushes them into another worker.
@@ -821,7 +822,6 @@ impl<T> Stealer<T> {
821
822
. is_err ( )
822
823
{
823
824
// We didn't steal this task, forget it and break from the loop.
824
- mem:: forget ( task) ;
825
825
batch_size = i;
826
826
break ;
827
827
}
@@ -975,7 +975,6 @@ impl<T> Stealer<T> {
975
975
. is_err ( )
976
976
{
977
977
// We didn't steal this task, forget it.
978
- mem:: forget ( task) ;
979
978
return Steal :: Retry ;
980
979
}
981
980
@@ -992,7 +991,6 @@ impl<T> Stealer<T> {
992
991
. is_err ( )
993
992
{
994
993
// We didn't steal this task, forget it.
995
- mem:: forget ( task) ;
996
994
return Steal :: Retry ;
997
995
}
998
996
@@ -1037,7 +1035,6 @@ impl<T> Stealer<T> {
1037
1035
. is_err ( )
1038
1036
{
1039
1037
// We didn't steal this task, forget it and break from the loop.
1040
- mem:: forget ( tmp) ;
1041
1038
batch_size = i;
1042
1039
break ;
1043
1040
}
@@ -1077,7 +1074,7 @@ impl<T> Stealer<T> {
1077
1074
dest. inner . back . store ( dest_b, Ordering :: Release ) ;
1078
1075
1079
1076
// Return with success.
1080
- Steal :: Success ( task)
1077
+ Steal :: Success ( unsafe { task. assume_init ( ) } )
1081
1078
}
1082
1079
}
1083
1080
@@ -1535,7 +1532,7 @@ impl<T> Injector<T> {
1535
1532
// Read the task.
1536
1533
let slot = ( * block) . slots . get_unchecked ( offset + i) ;
1537
1534
slot. wait_write ( ) ;
1538
- let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1535
+ let task = slot. task . get ( ) . read ( ) ;
1539
1536
1540
1537
// Write it into the destination queue.
1541
1538
dest_buffer. write ( dest_b. wrapping_add ( i as isize ) , task) ;
@@ -1547,7 +1544,7 @@ impl<T> Injector<T> {
1547
1544
// Read the task.
1548
1545
let slot = ( * block) . slots . get_unchecked ( offset + i) ;
1549
1546
slot. wait_write ( ) ;
1550
- let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1547
+ let task = slot. task . get ( ) . read ( ) ;
1551
1548
1552
1549
// Write it into the destination queue.
1553
1550
dest_buffer. write ( dest_b. wrapping_add ( ( batch_size - 1 - i) as isize ) , task) ;
@@ -1689,7 +1686,7 @@ impl<T> Injector<T> {
1689
1686
// Read the task.
1690
1687
let slot = ( * block) . slots . get_unchecked ( offset) ;
1691
1688
slot. wait_write ( ) ;
1692
- let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1689
+ let task = slot. task . get ( ) . read ( ) ;
1693
1690
1694
1691
match dest. flavor {
1695
1692
Flavor :: Fifo => {
@@ -1698,7 +1695,7 @@ impl<T> Injector<T> {
1698
1695
// Read the task.
1699
1696
let slot = ( * block) . slots . get_unchecked ( offset + i + 1 ) ;
1700
1697
slot. wait_write ( ) ;
1701
- let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1698
+ let task = slot. task . get ( ) . read ( ) ;
1702
1699
1703
1700
// Write it into the destination queue.
1704
1701
dest_buffer. write ( dest_b. wrapping_add ( i as isize ) , task) ;
@@ -1711,7 +1708,7 @@ impl<T> Injector<T> {
1711
1708
// Read the task.
1712
1709
let slot = ( * block) . slots . get_unchecked ( offset + i + 1 ) ;
1713
1710
slot. wait_write ( ) ;
1714
- let task = slot. task . get ( ) . read ( ) . assume_init ( ) ;
1711
+ let task = slot. task . get ( ) . read ( ) ;
1715
1712
1716
1713
// Write it into the destination queue.
1717
1714
dest_buffer. write ( dest_b. wrapping_add ( ( batch_size - 1 - i) as isize ) , task) ;
@@ -1744,7 +1741,7 @@ impl<T> Injector<T> {
1744
1741
}
1745
1742
}
1746
1743
1747
- Steal :: Success ( task)
1744
+ Steal :: Success ( task. assume_init ( ) )
1748
1745
}
1749
1746
}
1750
1747
0 commit comments