Skip to content

Commit ee94ee4

Browse files
authored
fix: EphemeralStore import should ignore timeout entries (#865)
* fix: ephe store import should ignore timeout entries * chore: changeset * test: add ephemeral store timeout apply tests * docs: add eph rust mod docs
1 parent 65593cb commit ee94ee4

File tree

5 files changed

+88
-3
lines changed

5 files changed

+88
-3
lines changed

.changeset/odd-rings-care.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"loro-crdt": patch
3+
"loro-crdt-map": patch
4+
---
5+
6+
fix: EphemeralStore apply should ignore timeout entries #865

crates/loro-internal/src/awareness.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
//! Ephemeral presence utilities.
2+
//!
3+
//! `EphemeralStore` is the recommended API: a timestamped, last-write-wins key-value
4+
//! store for transient presence data (cursors, selections, etc.). It supports:
5+
//! - Per-key timeouts: expired entries are skipped by `encode`/`encode_all` and removed
6+
//! by `remove_outdated`.
7+
//! - Import/export via `encode`/`apply` for syncing between peers.
8+
//! - Subscriptions for both local updates (raw bytes to send) and merged updates.
9+
//!
10+
//! The legacy `Awareness` type remains for backward compatibility but is deprecated in
11+
//! favor of `EphemeralStore`.
112
use std::sync::atomic::AtomicI64;
213
use std::sync::{Arc, Mutex};
314

@@ -416,13 +427,18 @@ impl EphemeralStoreInner {
416427
let mut added_keys = Vec::new();
417428
let mut removed_keys = Vec::new();
418429
let now = get_sys_timestamp() as Timestamp;
430+
let timeout = self.timeout.load(std::sync::atomic::Ordering::Relaxed);
419431
let mut states = self.states.lock().unwrap();
420432
for EncodedState {
421433
key,
422434
value: record,
423435
timestamp,
424436
} in peers_info
425437
{
438+
if now - timestamp > timeout {
439+
continue;
440+
}
441+
426442
match states.get_mut(key) {
427443
Some(peer_info) if peer_info.timestamp >= timestamp => {
428444
// do nothing
@@ -432,7 +448,7 @@ impl EphemeralStoreInner {
432448
key.to_string(),
433449
State {
434450
state: record.clone(),
435-
timestamp: now,
451+
timestamp,
436452
},
437453
);
438454
match (old, record) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use loro_internal::awareness::EphemeralStore;
2+
use loro_internal::LoroValue;
3+
use serde::Serialize;
4+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
5+
6+
#[derive(Serialize)]
7+
struct WireState {
8+
key: String,
9+
value: Option<LoroValue>,
10+
timestamp: i64,
11+
}
12+
13+
fn now_ms() -> i64 {
14+
SystemTime::now()
15+
.duration_since(UNIX_EPOCH)
16+
.unwrap()
17+
.as_millis() as i64
18+
}
19+
20+
#[test]
21+
fn import_skips_entries_past_timeout() {
22+
let store = EphemeralStore::new(100);
23+
let stale_timestamp = now_ms() - 500;
24+
25+
let payload = postcard::to_allocvec(&vec![WireState {
26+
key: "stale".into(),
27+
value: Some(LoroValue::from(1)),
28+
timestamp: stale_timestamp,
29+
}])
30+
.unwrap();
31+
32+
store.apply(&payload).unwrap();
33+
34+
assert!(store.get("stale").is_none());
35+
assert!(store.get_all_states().is_empty());
36+
}
37+
38+
#[test]
39+
fn import_preserves_remote_timestamp_for_timeout() {
40+
let timeout_ms = 100;
41+
let store = EphemeralStore::new(timeout_ms);
42+
let remote_timestamp = now_ms() - (timeout_ms - 20);
43+
44+
let payload = postcard::to_allocvec(&vec![WireState {
45+
key: "cursor".into(),
46+
value: Some(LoroValue::from("v")),
47+
timestamp: remote_timestamp,
48+
}])
49+
.unwrap();
50+
51+
store.apply(&payload).unwrap();
52+
assert_eq!(store.get("cursor"), Some(LoroValue::from("v")));
53+
54+
std::thread::sleep(Duration::from_millis(40));
55+
store.remove_outdated();
56+
57+
assert!(store.get("cursor").is_none());
58+
assert!(store.get_all_states().is_empty());
59+
}

crates/loro-wasm/src/awareness.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ impl AwarenessWasm {
121121
/// Get the timestamp of the state of a given peer.
122122
pub fn getTimestamp(&self, peer: JsIntoPeerID) -> JsResult<Option<f64>> {
123123
let id = js_peer_to_peer(peer.into())?;
124-
Ok(self.inner
124+
Ok(self
125+
.inner
125126
.get_all_states()
126127
.get(&id)
127128
.map(|r| r.timestamp as f64))

crates/loro-wasm/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2431,7 +2431,10 @@ fn diff_event_to_js_value(event: DiffEvent, for_json: bool) -> JsResult<JsValue>
24312431
/// path: Path;
24322432
/// }
24332433
///
2434-
fn container_diff_to_js_value(event: &loro_internal::ContainerDiff, for_json: bool) -> JsResult<JsValue> {
2434+
fn container_diff_to_js_value(
2435+
event: &loro_internal::ContainerDiff,
2436+
for_json: bool,
2437+
) -> JsResult<JsValue> {
24352438
let obj = js_sys::Object::new();
24362439
Reflect::set(&obj, &"target".into(), &event.id.to_string().into())?;
24372440
Reflect::set(

0 commit comments

Comments
 (0)