File tree Expand file tree Collapse file tree 3 files changed +8
-11
lines changed
Expand file tree Collapse file tree 3 files changed +8
-11
lines changed Original file line number Diff line number Diff line change @@ -481,7 +481,7 @@ pub struct LeasedBatchPart<T> {
481481 pub ( crate ) part : BatchPart < T > ,
482482 /// The lease that prevents this part from being GCed. Code should ensure that this lease
483483 /// lives as long as the part is needed.
484- pub ( crate ) lease : Option < Lease > ,
484+ pub ( crate ) lease : Lease ,
485485 pub ( crate ) filter_pushdown_audit : bool ,
486486}
487487
@@ -498,9 +498,9 @@ where
498498 /// that can't travel across process boundaries. The caller is responsible for
499499 /// ensuring that the lease is held for as long as the batch part may be in use:
500500 /// dropping it too early may cause a fetch to fail.
501- pub ( crate ) fn into_exchangeable_part ( mut self ) -> ( ExchangeableBatchPart < T > , Option < Lease > ) {
501+ pub ( crate ) fn into_exchangeable_part ( self ) -> ( ExchangeableBatchPart < T > , Lease ) {
502502 // If `x` has a lease, we've effectively transferred it to `r`.
503- let lease = self . lease . take ( ) ;
503+ let lease = self . lease . clone ( ) ;
504504 let part = ExchangeableBatchPart {
505505 shard_id : self . shard_id ,
506506 encoded_size_bytes : self . part . encoded_size_bytes ( ) ,
Original file line number Diff line number Diff line change @@ -572,9 +572,7 @@ where
572572 // seemed to work okay so far. Continue to revisit as necessary.
573573 let worker_idx = usize:: cast_from ( Instant :: now ( ) . hashed ( ) ) % num_workers;
574574 let ( part, lease) = part_desc. into_exchangeable_part ( ) ;
575- if let Some ( lease) = lease {
576- leases. borrow_mut ( ) . push_at ( current_ts. clone ( ) , lease) ;
577- }
575+ leases. borrow_mut ( ) . push_at ( current_ts. clone ( ) , lease) ;
578576 descs_output. give ( & session_cap, ( worker_idx, part) ) ;
579577 }
580578
Original file line number Diff line number Diff line change @@ -726,7 +726,7 @@ where
726726 filter,
727727 desc,
728728 part,
729- lease : Some ( self . lease_seqno ( ) ) ,
729+ lease : self . lease_seqno ( ) ,
730730 filter_pushdown_audit : false ,
731731 }
732732 }
@@ -1450,7 +1450,7 @@ mod tests {
14501450
14511451 // Repeat the same process as above, more or less, while fetching + returning parts
14521452 for ( mut i, part) in parts. into_iter ( ) . enumerate ( ) {
1453- let part_seqno = part. lease . as_ref ( ) . unwrap ( ) . seqno ( ) ;
1453+ let part_seqno = part. lease . seqno ( ) ;
14541454 let last_seqno = this_seqno;
14551455 this_seqno = part_seqno;
14561456 assert ! ( this_seqno >= last_seqno) ;
@@ -1463,9 +1463,8 @@ mod tests {
14631463 for event in subscribe. next ( None ) . await {
14641464 if let ListenEvent :: Updates ( parts) = event {
14651465 for part in parts {
1466- if let ( _, Some ( lease) ) = part. into_exchangeable_part ( ) {
1467- subsequent_parts. push ( lease) ;
1468- }
1466+ let ( _, lease) = part. into_exchangeable_part ( ) ;
1467+ subsequent_parts. push ( lease) ;
14691468 }
14701469 }
14711470 }
You can’t perform that action at this time.
0 commit comments