Skip to content

Commit 32c9f5a

Browse files
committed
Leased parts always have leases now
The only case where this wasn't true was exchanging over a network, and we no longer use leased part for that.
1 parent 6dd4726 commit 32c9f5a

File tree

3 files changed

+8
-11
lines changed

3 files changed

+8
-11
lines changed

src/persist-client/src/fetch.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ pub struct LeasedBatchPart<T> {
481481
pub(crate) part: BatchPart<T>,
482482
/// The lease that prevents this part from being GCed. Code should ensure that this lease
483483
/// lives as long as the part is needed.
484-
pub(crate) lease: Option<Lease>,
484+
pub(crate) lease: Lease,
485485
pub(crate) filter_pushdown_audit: bool,
486486
}
487487

@@ -498,9 +498,9 @@ where
498498
/// that can't travel across process boundaries. The caller is responsible for
499499
/// ensuring that the lease is held for as long as the batch part may be in use:
500500
/// dropping it too early may cause a fetch to fail.
501-
pub(crate) fn into_exchangeable_part(mut self) -> (ExchangeableBatchPart<T>, Option<Lease>) {
501+
pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
502502
// If `x` has a lease, we've effectively transferred it to `r`.
503-
let lease = self.lease.take();
503+
let lease = self.lease.clone();
504504
let part = ExchangeableBatchPart {
505505
shard_id: self.shard_id,
506506
encoded_size_bytes: self.part.encoded_size_bytes(),

src/persist-client/src/operators/shard_source.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -572,9 +572,7 @@ where
572572
// seemed to work okay so far. Continue to revisit as necessary.
573573
let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers;
574574
let (part, lease) = part_desc.into_exchangeable_part();
575-
if let Some(lease) = lease {
576-
leases.borrow_mut().push_at(current_ts.clone(), lease);
577-
}
575+
leases.borrow_mut().push_at(current_ts.clone(), lease);
578576
descs_output.give(&session_cap, (worker_idx, part));
579577
}
580578

src/persist-client/src/read.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ where
726726
filter,
727727
desc,
728728
part,
729-
lease: Some(self.lease_seqno()),
729+
lease: self.lease_seqno(),
730730
filter_pushdown_audit: false,
731731
}
732732
}
@@ -1450,7 +1450,7 @@ mod tests {
14501450

14511451
// Repeat the same process as above, more or less, while fetching + returning parts
14521452
for (mut i, part) in parts.into_iter().enumerate() {
1453-
let part_seqno = part.lease.as_ref().unwrap().seqno();
1453+
let part_seqno = part.lease.seqno();
14541454
let last_seqno = this_seqno;
14551455
this_seqno = part_seqno;
14561456
assert!(this_seqno >= last_seqno);
@@ -1463,9 +1463,8 @@ mod tests {
14631463
for event in subscribe.next(None).await {
14641464
if let ListenEvent::Updates(parts) = event {
14651465
for part in parts {
1466-
if let (_, Some(lease)) = part.into_exchangeable_part() {
1467-
subsequent_parts.push(lease);
1468-
}
1466+
let (_, lease) = part.into_exchangeable_part();
1467+
subsequent_parts.push(lease);
14691468
}
14701469
}
14711470
}

0 commit comments

Comments
 (0)