Skip to content

Commit 1ac955f

Browse files
committed
Bundle the seqno and the lease together
1 parent 28d0321 commit 1ac955f

File tree

3 files changed

+17
-13
lines changed

3 files changed

+17
-13
lines changed

src/persist-client/src/fetch.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,19 @@ where
428428
///
429429
/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
430430
/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
431-
#[derive(Clone, Debug, Default)]
432-
pub(crate) struct Lease(Arc<()>);
431+
#[derive(Clone, Debug)]
432+
pub(crate) struct Lease(Arc<SeqNo>);
433433

434434
impl Lease {
435+
pub fn new(seqno: SeqNo) -> Self {
436+
Self(Arc::new(seqno))
437+
}
438+
439+
#[cfg(test)]
440+
pub fn seqno(&self) -> SeqNo {
441+
*self.0
442+
}
443+
435444
/// Returns the number of live copies of this lease, including this one.
436445
pub fn count(&self) -> usize {
437446
Arc::strong_count(&self.0)
@@ -470,10 +479,6 @@ pub struct LeasedBatchPart<T> {
470479
pub(crate) filter: FetchBatchFilter<T>,
471480
pub(crate) desc: Description<T>,
472481
pub(crate) part: BatchPart<T>,
473-
/// The `SeqNo` from which this part originated; we track this value as
474-
/// to ensure the `SeqNo` isn't garbage collected while a
475-
/// read still depends on it.
476-
pub(crate) leased_seqno: SeqNo,
477482
/// The lease that prevents this part from being GCed. Code should ensure that this lease
478483
/// lives as long as the part is needed.
479484
pub(crate) lease: Option<Lease>,

src/persist-client/src/operators/shard_source.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -696,17 +696,17 @@ mod tests {
696696
use super::*;
697697
use std::sync::Arc;
698698

699+
use crate::operators::shard_source::shard_source;
700+
use crate::{Diagnostics, ShardId};
701+
use mz_persist::location::SeqNo;
699702
use timely::dataflow::Scope;
700703
use timely::dataflow::operators::Leave;
701704
use timely::dataflow::operators::Probe;
702705
use timely::progress::Antichain;
703706

704-
use crate::operators::shard_source::shard_source;
705-
use crate::{Diagnostics, ShardId};
706-
707707
#[mz_ore::test]
708708
fn test_lease_manager() {
709-
let lease = Lease::default();
709+
let lease = Lease::new(SeqNo::minimum());
710710
let mut manager = LeaseManager::new();
711711
for t in 0u64..10 {
712712
manager.push_at(t, lease.clone());

src/persist-client/src/read.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,6 @@ where
726726
filter,
727727
desc,
728728
part,
729-
leased_seqno: self.machine.seqno(),
730729
lease: Some(self.lease_seqno()),
731730
filter_pushdown_audit: false,
732731
}
@@ -752,7 +751,7 @@ where
752751
/// collected until its lease has been returned.
753752
fn lease_seqno(&mut self) -> Lease {
754753
let seqno = self.machine.seqno();
755-
let lease = self.leased_seqnos.entry(seqno).or_default();
754+
let lease = self.leased_seqnos.entry(seqno).or_insert(Lease::new(seqno));
756755
lease.clone()
757756
}
758757

@@ -1448,7 +1447,7 @@ mod tests {
14481447

14491448
// Repeat the same process as above, more or less, while fetching + returning parts
14501449
for (mut i, part) in parts.into_iter().enumerate() {
1451-
let part_seqno = part.leased_seqno;
1450+
let part_seqno = part.lease.as_ref().unwrap().seqno();
14521451
let last_seqno = this_seqno;
14531452
this_seqno = part_seqno;
14541453
assert!(this_seqno >= last_seqno);

0 commit comments

Comments
 (0)