@@ -4,6 +4,7 @@ use core::cell::UnsafeCell;
44use core:: sync:: atomic:: Ordering :: { Relaxed , SeqCst } ;
55use core:: sync:: atomic:: { AtomicBool , AtomicPtr , AtomicUsize } ;
66use core:: { fmt, hint, ptr} ;
7+ use std:: backtrace:: Backtrace ;
78use std:: mem;
89use std:: sync:: Arc ;
910
@@ -69,9 +70,11 @@ impl Request {
6970 while !prev. scheduled . load ( SeqCst ) {
7071 hint:: spin_loop ( ) ;
7172 }
73+ // notify the prev that current request is ready
7274 prev. next . store ( behavior as * mut Behavior , SeqCst ) ;
7375 return ;
7476 }
77+ // no prev exist, it's ok to go.
7578 unsafe {
7679 Behavior :: resolve_one ( behavior) ;
7780 }
@@ -98,6 +101,7 @@ impl Request {
98101 /// `self` must have been actually completed.
99102 unsafe fn release ( & self ) {
100103 if self . next . load ( SeqCst ) . is_null ( ) {
104+ // (2)this is the last request for the cown,
101105 if self
102106 . target
103107 . last ( )
@@ -111,10 +115,13 @@ impl Request {
111115 {
112116 return ;
113117 }
118+ // (3) this is not the last request for the cown,
119+ // wait for the next request to bet set
114120 while self . next . load ( SeqCst ) . is_null ( ) {
115121 hint:: spin_loop ( ) ;
116122 }
117123 }
124+ // (1)notify the successor to resolve one
118125 unsafe {
119126 Behavior :: resolve_one ( self . next . load ( SeqCst ) ) ;
120127 }
@@ -214,16 +221,22 @@ impl Behavior {
214221 /// Performs two phase locking (2PL) over the enqueuing of the requests.
215222 /// This ensures that the overall effect of the enqueue is atomic.
216223 fn schedule ( self ) {
224+ let b = Box :: leak ( Box :: new ( self ) ) ;
217225 unsafe {
218- for r in & self . requests {
219- r. start_enqueue ( & self as * const Self ) ;
226+ for r in & b . requests {
227+ r. start_enqueue ( b as * const Self ) ;
220228 }
221- for r in & self . requests {
229+ for r in & b . requests {
222230 r. finish_enqueue ( ) ;
223231 }
224- Behavior :: resolve_one ( & self as * const Self ) ;
232+ Behavior :: resolve_one ( b as * const Self ) ;
225233 }
226- // self dropped here
234+ // should not use mem::forget
235+ // Any resources the value manages, such as heap memory or a file handle,
236+ // will linger forever in an unreachable state. However, it does not guarantee
237+ // that pointers to this memory will remain valid.
238+
239+ // self should not drop here. resolve_one will drop it.
227240 }
228241
229242 /// Resolves a single outstanding request for `this`.
@@ -239,21 +252,20 @@ impl Behavior {
239252 if tmp. count . fetch_sub ( 1 , SeqCst ) != 1 {
240253 return ;
241254 }
242-
255+ // No other threads share this. It's time to destroy it.
256+
243257 let mut this = unsafe { Box :: from_raw ( this. cast_mut ( ) ) } ;
244- let thunk = this. thunk ; // moved to spawned thread
245- this. thunk = Box :: new ( move || { } ) ; // replace with a no-op
246- let requests = mem:: take ( & mut this. requests ) ; // take ownership of requests
247- Box :: leak ( this) ; // the caller will drop this, so don't drop here
258+ let thunk = mem:: replace ( & mut this. thunk , Box :: new ( move || { } ) ) ;
259+ let requests = mem:: take ( & mut this. requests ) ;
248260 spawn ( move || {
249- println ! ( "running behavior" ) ;
250- ( thunk) ( ) ;
261+ thunk ( ) ;
251262 for r in & requests {
252263 unsafe {
253264 r. release ( ) ;
254265 }
255266 }
256267 } ) ;
268+ // drop the behavior
257269 }
258270}
259271
@@ -278,16 +290,21 @@ impl Behavior {
278290 requests. sort ( ) ;
279291 Self {
280292 thunk : Box :: new ( move || {
281- f ( unsafe {
282- cowns. get_mut ( )
283- } ) ;
293+ f ( unsafe { cowns. get_mut ( ) } ) ;
284294 } ) ,
285295 count : AtomicUsize :: new ( requests. len ( ) + 1 ) ,
286296 requests,
287297 }
288298 }
289299}
290300
301+ #[ cfg( feature = "drop-location" ) ]
302+ impl Drop for Behavior {
303+ fn drop ( & mut self ) {
304+ println ! ( "{}" , Backtrace :: force_capture( ) ) ; // see where behavior is dropped
305+ }
306+ }
307+
291308/// Trait for a collection of `CownPtr`s.
292309///
293310/// Users pass `CownPtrs` to `when!` clause to specify a collection of shared resources, and such
@@ -369,7 +386,8 @@ where
369386 C : CownPtrs + Send + ' static ,
370387 F : for < ' l > Fn ( C :: CownRefs < ' l > ) + Send + ' static ,
371388{
372- Behavior :: new ( cowns, f) . schedule ( ) ;
389+ let b = Behavior :: new ( cowns, f) ;
390+ b. schedule ( ) ;
373391}
374392
375393/// from <https://docs.rs/tuple_list/latest/tuple_list/>
@@ -462,25 +480,21 @@ fn boc_vec() {
462480fn boc_thunk_move_all ( ) {
463481 let c1 = CownPtr :: new ( 0 ) ;
464482
465- let b = Box :: new ( Behavior :: new ( tuple_list ! ( c1) , move |g1| {
466- println ! ( "1" )
467- } ) ) ;
483+ let mut b = Box :: new ( Behavior :: new ( tuple_list ! ( c1) , move |g1| println ! ( "1" ) ) ) ;
468484 spawn ( move || {
469- ( b. thunk ) ( ) ;
470- } ) ;
485+ mem :: replace ( & mut b. thunk , Box :: new ( move || { } ) ) ( ) ;
486+ } ) ;
471487}
472488
473489#[ test]
474490fn boc_thunk_move_thunk ( ) {
475491 let c1 = CownPtr :: new ( 0 ) ;
476492
477- let b = Box :: new ( Behavior :: new ( tuple_list ! ( c1) , move |g1| {
478- println ! ( "1" )
479- } ) ) ;
480- let thunk = b. thunk ;
493+ let mut b = Box :: new ( Behavior :: new ( tuple_list ! ( c1) , move |g1| println ! ( "1" ) ) ) ;
494+ let thunk = mem:: replace ( & mut b. thunk , Box :: new ( move || { } ) ) ;
481495 spawn ( move || {
482496 ( thunk) ( ) ;
483- } ) ;
497+ } ) ;
484498}
485499
486500#[ test]
@@ -506,5 +520,63 @@ fn boc_simple() {
506520 // c3, c2 are moved into this thunk. There's no such thing as auto-cloning move closure.
507521 * g1 += 1 ;
508522 * g2 += 1 ;
523+ when!( c3, c2; g3, g2; {
524+ * g2 += 1 ;
525+ * g3 = true ;
526+ } ) ;
527+ } ) ;
528+ }
529+
530+ #[ test]
531+ fn boc_channel ( ) {
532+ let c1 = CownPtr :: new ( 1 ) ;
533+ let c2 = CownPtr :: new ( 2 ) ;
534+ let c3 = CownPtr :: new ( true ) ;
535+ let c2_ = c2. clone ( ) ;
536+ let c3_ = c3. clone ( ) ;
537+
538+ let ( finish_sender, finish_receiver) = crossbeam_channel:: bounded ( 0 ) ;
539+
540+ when ! ( c1, c2_, c3_; g1, g2, g3; {
541+ assert_eq!( * g1, 1 ) ;
542+ assert_eq!( * g2, if * g3 { 2 } else { 1 } ) ;
543+ finish_sender. send( ( ) ) . unwrap( ) ;
509544 } ) ;
510- }
545+
546+ // wait for termination
547+ finish_receiver. recv ( ) . unwrap ( ) ;
548+ }
549+
550+ #[ test]
551+ fn boc_two_when_one_cown ( ) {
552+ let c1 = CownPtr :: new ( 1 ) ;
553+ when ! ( c1; g1; {
554+ * g1 += 1 ;
555+ println!( "{}" , * g1) ;
556+ } ) ;
557+
558+ when ! ( c1; g1; {
559+ * g1 += 1 ;
560+ println!( "{}" , * g1) ;
561+ } ) ;
562+ }
563+
564+ #[ test]
565+ fn boc_two_when_overlap_cown ( ) {
566+ let c1 = CownPtr :: new ( 1 ) ;
567+ let c2 = CownPtr :: new ( 2 ) ;
568+ let c3 = CownPtr :: new ( 3 ) ;
569+ when ! ( c1, c2; g1, g2; {
570+ * g1 += 1 ;
571+ * g2 += 1 ;
572+ println!( "{}" , * g2) ;
573+ assert_eq!( * g1+1 , * g2) ;
574+ } ) ;
575+
576+ println ! ( "{}" , unsafe { * c2. inner. value. get( ) } ) ;
577+
578+ when ! ( c2, c3; g2, g3; {
579+ println!( "{}" , * g2) ;
580+ assert_eq!( * g2, * g3) ;
581+ } ) ;
582+ }
0 commit comments