File tree 3 files changed +8
-11
lines changed
3 files changed +8
-11
lines changed Original file line number Diff line number Diff line change @@ -481,7 +481,7 @@ pub struct LeasedBatchPart<T> {
481
481
pub ( crate ) part : BatchPart < T > ,
482
482
/// The lease that prevents this part from being GCed. Code should ensure that this lease
483
483
/// lives as long as the part is needed.
484
- pub ( crate ) lease : Option < Lease > ,
484
+ pub ( crate ) lease : Lease ,
485
485
pub ( crate ) filter_pushdown_audit : bool ,
486
486
}
487
487
@@ -498,9 +498,9 @@ where
498
498
/// that can't travel across process boundaries. The caller is responsible for
499
499
/// ensuring that the lease is held for as long as the batch part may be in use:
500
500
/// dropping it too early may cause a fetch to fail.
501
- pub ( crate ) fn into_exchangeable_part ( mut self ) -> ( ExchangeableBatchPart < T > , Option < Lease > ) {
501
+ pub ( crate ) fn into_exchangeable_part ( self ) -> ( ExchangeableBatchPart < T > , Lease ) {
502
502
// If `x` has a lease, we've effectively transferred it to `r`.
503
- let lease = self . lease . take ( ) ;
503
+ let lease = self . lease . clone ( ) ;
504
504
let part = ExchangeableBatchPart {
505
505
shard_id : self . shard_id ,
506
506
encoded_size_bytes : self . part . encoded_size_bytes ( ) ,
Original file line number Diff line number Diff line change @@ -572,9 +572,7 @@ where
572
572
// seemed to work okay so far. Continue to revisit as necessary.
573
573
let worker_idx = usize:: cast_from ( Instant :: now ( ) . hashed ( ) ) % num_workers;
574
574
let ( part, lease) = part_desc. into_exchangeable_part ( ) ;
575
- if let Some ( lease) = lease {
576
- leases. borrow_mut ( ) . push_at ( current_ts. clone ( ) , lease) ;
577
- }
575
+ leases. borrow_mut ( ) . push_at ( current_ts. clone ( ) , lease) ;
578
576
descs_output. give ( & session_cap, ( worker_idx, part) ) ;
579
577
}
580
578
Original file line number Diff line number Diff line change @@ -726,7 +726,7 @@ where
726
726
filter,
727
727
desc,
728
728
part,
729
- lease : Some ( self . lease_seqno ( ) ) ,
729
+ lease : self . lease_seqno ( ) ,
730
730
filter_pushdown_audit : false ,
731
731
}
732
732
}
@@ -1450,7 +1450,7 @@ mod tests {
1450
1450
1451
1451
// Repeat the same process as above, more or less, while fetching + returning parts
1452
1452
for ( mut i, part) in parts. into_iter ( ) . enumerate ( ) {
1453
- let part_seqno = part. lease . as_ref ( ) . unwrap ( ) . seqno ( ) ;
1453
+ let part_seqno = part. lease . seqno ( ) ;
1454
1454
let last_seqno = this_seqno;
1455
1455
this_seqno = part_seqno;
1456
1456
assert ! ( this_seqno >= last_seqno) ;
@@ -1463,9 +1463,8 @@ mod tests {
1463
1463
for event in subscribe. next ( None ) . await {
1464
1464
if let ListenEvent :: Updates ( parts) = event {
1465
1465
for part in parts {
1466
- if let ( _, Some ( lease) ) = part. into_exchangeable_part ( ) {
1467
- subsequent_parts. push ( lease) ;
1468
- }
1466
+ let ( _, lease) = part. into_exchangeable_part ( ) ;
1467
+ subsequent_parts. push ( lease) ;
1469
1468
}
1470
1469
}
1471
1470
}
You can’t perform that action at this time.
0 commit comments