Skip to content

Commit a03f81c

Browse files
committed
Leased parts always have leases now
The only case where this wasn't true was exchanging over a network, and we no longer use leased part for that.
1 parent 1ac955f commit a03f81c

File tree

3 files changed

+8
-11
lines changed

3 files changed

+8
-11
lines changed

src/persist-client/src/fetch.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ pub struct LeasedBatchPart<T> {
481481
pub(crate) part: BatchPart<T>,
482482
/// The lease that prevents this part from being GCed. Code should ensure that this lease
483483
/// lives as long as the part is needed.
484-
pub(crate) lease: Option<Lease>,
484+
pub(crate) lease: Lease,
485485
pub(crate) filter_pushdown_audit: bool,
486486
}
487487

@@ -498,9 +498,9 @@ where
498498
/// that can't travel across process boundaries. The caller is responsible for
499499
/// ensuring that the lease is held for as long as the batch part may be in use:
500500
/// dropping it too early may cause a fetch to fail.
501-
pub(crate) fn into_exchangeable_part(mut self) -> (ExchangeableBatchPart<T>, Option<Lease>) {
501+
pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
502502
// If `x` has a lease, we've effectively transferred it to `r`.
503-
let lease = self.lease.take();
503+
let lease = self.lease.clone();
504504
let part = ExchangeableBatchPart {
505505
shard_id: self.shard_id,
506506
encoded_size_bytes: self.part.encoded_size_bytes(),

src/persist-client/src/operators/shard_source.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -572,9 +572,7 @@ where
572572
// seemed to work okay so far. Continue to revisit as necessary.
573573
let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers;
574574
let (part, lease) = part_desc.into_exchangeable_part();
575-
if let Some(lease) = lease {
576-
leases.borrow_mut().push_at(current_ts.clone(), lease);
577-
}
575+
leases.borrow_mut().push_at(current_ts.clone(), lease);
578576
descs_output.give(&session_cap, (worker_idx, part));
579577
}
580578

src/persist-client/src/read.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ where
726726
filter,
727727
desc,
728728
part,
729-
lease: Some(self.lease_seqno()),
729+
lease: self.lease_seqno(),
730730
filter_pushdown_audit: false,
731731
}
732732
}
@@ -1447,7 +1447,7 @@ mod tests {
14471447

14481448
// Repeat the same process as above, more or less, while fetching + returning parts
14491449
for (mut i, part) in parts.into_iter().enumerate() {
1450-
let part_seqno = part.lease.as_ref().unwrap().seqno();
1450+
let part_seqno = part.lease.seqno();
14511451
let last_seqno = this_seqno;
14521452
this_seqno = part_seqno;
14531453
assert!(this_seqno >= last_seqno);
@@ -1460,9 +1460,8 @@ mod tests {
14601460
for event in subscribe.next(None).await {
14611461
if let ListenEvent::Updates(parts) = event {
14621462
for part in parts {
1463-
if let (_, Some(lease)) = part.into_exchangeable_part() {
1464-
subsequent_parts.push(lease);
1465-
}
1463+
let (_, lease) = part.into_exchangeable_part();
1464+
subsequent_parts.push(lease);
14661465
}
14671466
}
14681467
}

0 commit comments

Comments
 (0)