diff --git a/Cargo.toml b/Cargo.toml index 5cd1f798..b11d59cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ path = "examples/tcp_example.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -automerge = { version = "0.4.1" } +automerge = { version = "0.5.0" } uuid = { version = "1.2.2"} crossbeam-channel = { version = "0.5.8" } parking_lot = { version = "0.12.1" } diff --git a/src/repo.rs b/src/repo.rs index 80ef77df..814eff61 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -2,7 +2,7 @@ use crate::dochandle::{DocHandle, SharedDocument}; use crate::interfaces::{DocumentId, RepoId}; use crate::interfaces::{NetworkError, RepoMessage, Storage, StorageError}; use automerge::sync::{Message as SyncMessage, State as SyncState, SyncDoc}; -use automerge::Automerge; +use automerge::{Automerge, ChangeHash}; use core::pin::Pin; use crossbeam_channel::{select, unbounded, Receiver, Sender}; use futures::future::Future; @@ -527,6 +527,8 @@ pub(crate) struct DocumentInfo { /// Counter of patches since last save, /// used to make decisions about full or incemental saves. patches_since_last_save: usize, + /// Last heads obtained from the automerge doc. + last_heads: Vec, } impl DocumentInfo { @@ -535,6 +537,10 @@ impl DocumentInfo { document: Arc>, handle_count: Arc, ) -> Self { + let last_heads = { + let doc = document.read(); + doc.automerge.get_heads() + }; DocumentInfo { state, document, @@ -542,6 +548,7 @@ impl DocumentInfo { sync_states: Default::default(), change_observers: Default::default(), patches_since_last_save: 0, + last_heads, } } @@ -663,8 +670,14 @@ impl DocumentInfo { /// Count patches since last save, /// returns whether there were any. fn note_changes(&mut self) -> bool { - // TODO: count patches somehow. - true + let count = { + let doc = self.document.read(); + let changes = doc.automerge.get_changes(&self.last_heads); + changes.len() + }; + let has_patches = count > 0; + self.patches_since_last_save = self.patches_since_last_save.checked_add(count).unwrap_or(0); + has_patches } fn resolve_change_observers(&mut self, result: Result<(), RepoError>) { @@ -683,18 +696,21 @@ impl DocumentInfo { return; } let should_compact = self.patches_since_last_save > 10; - let storage_fut = if should_compact { - let to_save = { - let mut doc = self.document.write(); - doc.automerge.save() + let (storage_fut, new_heads) = if should_compact { + let (to_save, new_heads) = { + let doc = self.document.read(); + (doc.automerge.save(), doc.automerge.get_heads()) }; - storage.compact(document_id.clone(), to_save) + (storage.compact(document_id.clone(), to_save), new_heads) } else { - let to_save = { - let mut doc = self.document.write(); - doc.automerge.save_incremental() + let (to_save, new_heads) = { + let doc = self.document.read(); + ( + doc.automerge.save_after(&self.last_heads), + doc.automerge.get_heads(), + ) }; - storage.append(document_id.clone(), to_save) + (storage.append(document_id.clone(), to_save), new_heads) }; self.state = match self.state { DocState::Sync(None) => DocState::Sync(Some(storage_fut)), @@ -710,6 +726,7 @@ impl DocumentInfo { let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id)); self.state.poll_pending_save(waker); self.patches_since_last_save = 0; + self.last_heads = new_heads; } /// Apply incoming sync messages.