Skip to content

Commit dd88ae5

Browse files
committed
fix(manifest): split read/write lease tracking
1 parent 1ece11f commit dd88ae5

File tree

3 files changed

+210
-68
lines changed

3 files changed

+210
-68
lines changed

fusio-manifest/src/lease/mod.rs

Lines changed: 155 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,39 @@ pub mod keeper;
2020
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2121
pub struct LeaseId(pub String);
2222

23+
/// Lease intent to distinguish read snapshots from active write transactions.
24+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
25+
#[serde(rename_all = "snake_case")]
26+
pub enum LeaseKind {
27+
#[default]
28+
Read,
29+
Write,
30+
}
31+
2332
#[derive(Debug, Clone)]
2433
pub struct LeaseHandle {
2534
pub id: LeaseId,
2635
pub snapshot_txn_id: u64,
36+
pub active_txn_id: Option<u64>,
37+
pub kind: LeaseKind,
2738
}
2839

2940
#[derive(Debug, Clone, Serialize, Deserialize)]
3041
pub struct ActiveLease {
3142
pub id: LeaseId,
3243
pub snapshot_txn_id: u64,
44+
pub active_txn_id: Option<u64>,
45+
pub kind: LeaseKind,
3346
pub expires_at: Duration,
3447
}
3548

3649
pub trait LeaseStore: MaybeSend + MaybeSync + Clone {
3750
fn create(
3851
&self,
3952
snapshot_txn_id: u64,
53+
active_txn_id: Option<u64>,
4054
head_tag: Option<HeadTag>,
55+
kind: LeaseKind,
4156
ttl: Duration,
4257
) -> impl MaybeSendFuture<Output = Result<LeaseHandle>> + '_;
4358

@@ -59,6 +74,10 @@ pub trait LeaseStore: MaybeSend + MaybeSync + Clone {
5974
struct LeaseDoc {
6075
id: String,
6176
snapshot_txn_id: u64,
77+
#[serde(default)]
78+
active_txn_id: Option<u64>,
79+
#[serde(default)]
80+
kind: LeaseKind,
6281
expires_at_ms: u64,
6382
#[serde(skip_serializing_if = "Option::is_none")]
6483
head_tag: Option<String>,
@@ -106,6 +125,91 @@ where
106125
.as_millis()
107126
.min(u128::from(u64::MAX)) as u64
108127
}
128+
129+
async fn create_impl(
130+
&self,
131+
snapshot_txn_id: u64,
132+
active_txn_id: Option<u64>,
133+
head_tag: Option<HeadTag>,
134+
kind: LeaseKind,
135+
ttl: Duration,
136+
) -> Result<LeaseHandle, Error>
137+
where
138+
FS: Fs + FsCas + Clone + MaybeSend + MaybeSync + 'static,
139+
T: Timer + Clone + 'static,
140+
{
141+
match kind {
142+
LeaseKind::Read if active_txn_id.is_some() => {
143+
return Err(Error::Corrupt(
144+
"read lease must not include active_txn_id".into(),
145+
));
146+
}
147+
LeaseKind::Write if active_txn_id.is_none() => {
148+
return Err(Error::Corrupt("write lease requires active_txn_id".into()));
149+
}
150+
_ => {}
151+
}
152+
let ttl_ms = ttl.as_millis().min(u128::from(u64::MAX)) as u64;
153+
let mut attempt: u32 = 0;
154+
let mut backoff_iter = self.backoff.build_backoff();
155+
156+
loop {
157+
let now = self.wall_clock_now_ms();
158+
let expires_at_ms = now.saturating_add(ttl_ms);
159+
let id = if attempt == 0 {
160+
format!("lease-{}", now)
161+
} else {
162+
format!("lease-{}-{}", now, attempt)
163+
};
164+
let key = self.key_for(&id);
165+
let doc = LeaseDoc {
166+
id: id.clone(),
167+
snapshot_txn_id,
168+
active_txn_id,
169+
kind,
170+
expires_at_ms,
171+
head_tag: head_tag.as_ref().map(|t| t.0.clone()),
172+
};
173+
let body = serde_json::to_vec(&doc)
174+
.map_err(|e| Error::Corrupt(format!("lease encode: {e}")))?;
175+
let path = Path::parse(&key).map_err(Error::other)?;
176+
match self
177+
.fs
178+
.put_conditional(
179+
&path,
180+
&body,
181+
Some("application/json"),
182+
None,
183+
CasCondition::IfNotExists,
184+
)
185+
.await
186+
{
187+
Ok(_) => {
188+
return Ok(LeaseHandle {
189+
id: LeaseId(id),
190+
snapshot_txn_id,
191+
active_txn_id,
192+
kind,
193+
});
194+
}
195+
Err(e) => {
196+
let err = Error::Io(e);
197+
match classify_error(&err) {
198+
RetryClass::RetryTransient => {
199+
if let Some(delay) = backoff_iter.next() {
200+
attempt += 1;
201+
self.timer.sleep(delay).await;
202+
continue;
203+
} else {
204+
return Err(err);
205+
}
206+
}
207+
_ => return Err(err),
208+
}
209+
}
210+
}
211+
}
212+
}
109213
}
110214

111215
impl<FS, T> LeaseStore for LeaseStoreImpl<FS, T>
@@ -116,66 +220,14 @@ where
116220
fn create(
117221
&self,
118222
snapshot_txn_id: u64,
223+
active_txn_id: Option<u64>,
119224
head_tag: Option<HeadTag>,
225+
kind: LeaseKind,
120226
ttl: Duration,
121227
) -> impl MaybeSendFuture<Output = Result<LeaseHandle, Error>> + '_ {
122228
async move {
123-
let ttl_ms = ttl.as_millis().min(u128::from(u64::MAX)) as u64;
124-
let mut attempt: u32 = 0;
125-
let mut backoff_iter = self.backoff.build_backoff();
126-
127-
loop {
128-
let now = self.wall_clock_now_ms();
129-
let expires_at_ms = now.saturating_add(ttl_ms);
130-
let id = if attempt == 0 {
131-
format!("lease-{}", now)
132-
} else {
133-
format!("lease-{}-{}", now, attempt)
134-
};
135-
let key = self.key_for(&id);
136-
let doc = LeaseDoc {
137-
id: id.clone(),
138-
snapshot_txn_id,
139-
expires_at_ms,
140-
head_tag: head_tag.as_ref().map(|t| t.0.clone()),
141-
};
142-
let body = serde_json::to_vec(&doc)
143-
.map_err(|e| Error::Corrupt(format!("lease encode: {e}")))?;
144-
let path = Path::parse(&key).map_err(Error::other)?;
145-
match self
146-
.fs
147-
.put_conditional(
148-
&path,
149-
&body,
150-
Some("application/json"),
151-
None,
152-
CasCondition::IfNotExists,
153-
)
154-
.await
155-
{
156-
Ok(_) => {
157-
return Ok(LeaseHandle {
158-
id: LeaseId(id),
159-
snapshot_txn_id,
160-
});
161-
}
162-
Err(e) => {
163-
let err: Error = e.into();
164-
match classify_error(&err) {
165-
RetryClass::RetryTransient => {
166-
if let Some(delay) = backoff_iter.next() {
167-
attempt += 1;
168-
self.timer.sleep(delay).await;
169-
continue;
170-
} else {
171-
return Err(err);
172-
}
173-
}
174-
_ => return Err(err),
175-
}
176-
}
177-
}
178-
}
229+
self.create_impl(snapshot_txn_id, active_txn_id, head_tag, kind, ttl)
230+
.await
179231
}
180232
}
181233

@@ -253,6 +305,8 @@ where
253305
out.push(ActiveLease {
254306
id: LeaseId(doc.id),
255307
snapshot_txn_id: doc.snapshot_txn_id,
308+
active_txn_id: doc.active_txn_id,
309+
kind: doc.kind,
256310
expires_at: Duration::from_millis(doc.expires_at_ms),
257311
});
258312
}
@@ -393,15 +447,23 @@ mod tests {
393447
let store = in_memory_stores.lease;
394448
// Create two leases at different snapshot txn_ids
395449
let ttl = Duration::from_secs(60);
396-
let l1 = store.create(100, None, ttl).await.unwrap();
397-
let l2 = store.create(50, None, ttl).await.unwrap();
450+
let l1 = store
451+
.create(100, None, None, LeaseKind::Read, ttl)
452+
.await
453+
.unwrap();
454+
let l2 = store
455+
.create(50, None, None, LeaseKind::Read, ttl)
456+
.await
457+
.unwrap();
398458

399459
// Now = 0 should show both as active if we pass a very small now (simulate immediate
400460
// check)
401461
let active = store.list_active(Duration::from_millis(0)).await.unwrap();
402462
assert_eq!(active.len(), 2);
403463
let min = active.iter().map(|l| l.snapshot_txn_id).min().unwrap();
404464
assert_eq!(min, 50);
465+
assert!(active.iter().all(|l| l.active_txn_id.is_none()));
466+
assert!(active.iter().all(|l| l.kind == LeaseKind::Read));
405467

406468
// Heartbeat l1 and ensure it extends expiry without affecting txn id
407469
store.heartbeat(&l1, ttl).await.unwrap();
@@ -416,6 +478,28 @@ mod tests {
416478
})
417479
}
418480

481+
#[rstest]
482+
fn mem_lease_kind_and_active_txn_persisted(in_memory_stores: InMemoryStores) {
483+
block_on(async move {
484+
let store = in_memory_stores.lease;
485+
let ttl = Duration::from_secs(60);
486+
let lease = store
487+
.create(10, Some(11), None, LeaseKind::Write, ttl)
488+
.await
489+
.unwrap();
490+
assert_eq!(lease.snapshot_txn_id, 10);
491+
assert_eq!(lease.active_txn_id, Some(11));
492+
assert_eq!(lease.kind, LeaseKind::Write);
493+
494+
let active = store.list_active(Duration::from_millis(0)).await.unwrap();
495+
assert_eq!(active.len(), 1);
496+
let got = &active[0];
497+
assert_eq!(got.snapshot_txn_id, 10);
498+
assert_eq!(got.active_txn_id, Some(11));
499+
assert_eq!(got.kind, LeaseKind::Write);
500+
})
501+
}
502+
419503
#[test]
420504
fn lease_first_attempt_success_no_retry_suffix() {
421505
block_on(async move {
@@ -425,7 +509,10 @@ mod tests {
425509
let store = LeaseStoreImpl::new(fs, "", policy, timer);
426510
let ttl = Duration::from_secs(60);
427511

428-
let lease = store.create(100, None, ttl).await.unwrap();
512+
let lease = store
513+
.create(100, None, None, LeaseKind::Read, ttl)
514+
.await
515+
.unwrap();
429516

430517
assert!(
431518
lease.id.0.starts_with("lease-"),
@@ -449,6 +536,8 @@ mod tests {
449536
after_prefix
450537
);
451538
assert_eq!(lease.snapshot_txn_id, 100);
539+
assert!(lease.active_txn_id.is_none());
540+
assert_eq!(lease.kind, LeaseKind::Read);
452541
})
453542
}
454543

@@ -469,7 +558,10 @@ mod tests {
469558
let store = LeaseStoreImpl::new(fs, "", policy, timer);
470559
let ttl = Duration::from_secs(60);
471560

472-
let lease = store.create(100, None, ttl).await.unwrap();
561+
let lease = store
562+
.create(100, None, None, LeaseKind::Read, ttl)
563+
.await
564+
.unwrap();
473565

474566
assert!(
475567
lease.id.0.starts_with("lease-"),
@@ -482,6 +574,8 @@ mod tests {
482574
lease.id.0
483575
);
484576
assert_eq!(lease.snapshot_txn_id, 100);
577+
assert!(lease.active_txn_id.is_none());
578+
assert_eq!(lease.kind, LeaseKind::Read);
485579
})
486580
}
487581

@@ -502,7 +596,7 @@ mod tests {
502596
let store = LeaseStoreImpl::new(fs, "", policy, timer);
503597
let ttl = Duration::from_secs(60);
504598

505-
let result = store.create(100, None, ttl).await;
599+
let result = store.create(100, None, None, LeaseKind::Read, ttl).await;
506600

507601
assert!(result.is_err());
508602
let err = result.unwrap_err();

fusio-manifest/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub mod manifest;
1515
pub mod session;
1616
pub mod snapshot; // Snapshot types and snapshot-bound reader // Unified read/write session API
1717
// Re-export lease handle type for public read-lease APIs
18-
pub use lease::{keeper::LeaseKeeper, LeaseHandle};
18+
pub use lease::{keeper::LeaseKeeper, ActiveLease, LeaseHandle, LeaseKind};
1919

2020
pub use crate::{
2121
checkpoint::{CheckpointStore, CheckpointStoreImpl},

0 commit comments

Comments
 (0)