Skip to content

Commit 276e078

Browse files
committed
Bundle the seqno and the lease together
1 parent d516d58 commit 276e078

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

src/persist-client/src/fetch.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,19 @@ where
428428
///
429429
/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
430430
/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
431-
#[derive(Clone, Debug, Default)]
432-
pub(crate) struct Lease(Arc<()>);
431+
#[derive(Clone, Debug)]
432+
pub(crate) struct Lease(Arc<SeqNo>);
433433

434434
impl Lease {
435+
pub fn new(seqno: SeqNo) -> Self {
436+
Self(Arc::new(seqno))
437+
}
438+
439+
#[cfg(test)]
440+
pub fn seqno(&self) -> SeqNo {
441+
*self.0
442+
}
443+
435444
/// Returns the number of live copies of this lease, including this one.
436445
pub fn count(&self) -> usize {
437446
Arc::strong_count(&self.0)
@@ -470,10 +479,6 @@ pub struct LeasedBatchPart<T> {
470479
pub(crate) filter: FetchBatchFilter<T>,
471480
pub(crate) desc: Description<T>,
472481
pub(crate) part: BatchPart<T>,
473-
/// The `SeqNo` from which this part originated; we track this value as
474-
/// to ensure the `SeqNo` isn't garbage collected while a
475-
/// read still depends on it.
476-
pub(crate) leased_seqno: SeqNo,
477482
/// The lease that prevents this part from being GCed. Code should ensure that this lease
478483
/// lives as long as the part is needed.
479484
pub(crate) lease: Option<Lease>,

src/persist-client/src/operators/shard_source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,7 @@ mod tests {
696696
use super::*;
697697
use std::sync::Arc;
698698

699+
use mz_persist::location::SeqNo;
699700
use timely::dataflow::Scope;
700701
use timely::dataflow::operators::Leave;
701702
use timely::dataflow::operators::Probe;
@@ -706,7 +707,7 @@ mod tests {
706707

707708
#[mz_ore::test]
708709
fn test_lease_manager() {
709-
let lease = Lease::default();
710+
let lease = Lease::new(SeqNo::minimum());
710711
let mut manager = LeaseManager::new();
711712
for t in 0u64..10 {
712713
manager.push_at(t, lease.clone());

src/persist-client/src/read.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,6 @@ where
726726
filter,
727727
desc,
728728
part,
729-
leased_seqno: self.machine.seqno(),
730729
lease: Some(self.lease_seqno()),
731730
filter_pushdown_audit: false,
732731
}
@@ -752,7 +751,10 @@ where
752751
/// collected until its lease has been returned.
753752
fn lease_seqno(&mut self) -> Lease {
754753
let seqno = self.machine.seqno();
755-
let lease = self.leased_seqnos.entry(seqno).or_default();
754+
let lease = self
755+
.leased_seqnos
756+
.entry(seqno)
757+
.or_insert_with(|| Lease::new(seqno));
756758
lease.clone()
757759
}
758760

@@ -1448,7 +1450,7 @@ mod tests {
14481450

14491451
// Repeat the same process as above, more or less, while fetching + returning parts
14501452
for (mut i, part) in parts.into_iter().enumerate() {
1451-
let part_seqno = part.leased_seqno;
1453+
let part_seqno = part.lease.as_ref().unwrap().seqno();
14521454
let last_seqno = this_seqno;
14531455
this_seqno = part_seqno;
14541456
assert!(this_seqno >= last_seqno);

0 commit comments

Comments
 (0)