diff --git a/Cargo.toml b/Cargo.toml index 0afbc65..9d58c7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ axum = ["dep:axum", "dep:tokio", "dep:tokio-util"] tungstenite = ["dep:tungstenite"] [dependencies] -automerge = { version = "0.4.1" } +automerge = { version = "0.5.0" } axum = { version = "0.6.18", features = ["ws"], optional = true } uuid = { version = "1.2.2"} crossbeam-channel = { version = "0.5.8" } @@ -60,4 +60,4 @@ test-log = { version = "0.2.12", features = ["trace"] } env_logger = "0.10.0" tracing-subscriber = { version = "0.3.17", features = ["fmt", "env-filter"] } itertools = "0.11.0" -autosurgeon = "0.7.1" +autosurgeon = "0.8.0" diff --git a/src/fs_store.rs b/src/fs_store.rs index 1b30d44..b61b934 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -180,31 +180,76 @@ impl FsStore { Ok(()) } - pub fn compact(&self, id: &DocumentId, _full_doc: &[u8]) -> Result<(), Error> { + pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> { let paths = DocIdPaths::from(id); // Load all the data we have into a doc - let Some(chunks) = Chunks::load(&self.root, id)? else { - tracing::warn!(doc_id=%id, "attempted to compact non-existent document"); - return Ok(()); - }; - let mut doc = chunks - .to_doc() - .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; - - // Write the snapshot - let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); - let chunk = doc.save(); - write_chunk(&self.root, &paths, &chunk, output_chunk_name, &self.tmpdir)?; - - // Remove all the old data - for incremental in chunks.incrementals.keys() { - let path = paths.chunk_path(&self.root, incremental); - std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; - } - for snapshot in chunks.snapshots.keys() { - let path = paths.chunk_path(&self.root, snapshot); - std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + match Chunks::load(&self.root, id) { + Ok(Some(chunks)) => { + let doc = chunks + .to_doc() + .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; + + // Write the snapshot + let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); + let chunk = doc.save(); + write_chunk( + &self.root, + &paths, + &chunk, + output_chunk_name.clone(), + &self.tmpdir, + )?; + + // Remove all the old data + for incremental in chunks.incrementals.keys() { + let path = paths.chunk_path(&self.root, incremental); + std::fs::remove_file(&path) + .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + } + let just_wrote = paths.chunk_path(&self.root, &output_chunk_name); + for snapshot in chunks.snapshots.keys() { + let path = paths.chunk_path(&self.root, snapshot); + + if path == just_wrote { + tracing::error!( + ?path, + "Somehow trying to delete the same path we just wrote to. Not today \ + Satan" + ); + continue; + } + + std::fs::remove_file(&path) + .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + } + } + Ok(None) => { + // The chunks are missing in storage for whatever reason + // Try to recreate the document from full_doc if possible + let doc = automerge::Automerge::load(full_doc) + .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; + + std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| { + Error(ErrorKind::CreateLevel2Path( + paths.level2_path(&self.root), + e, + )) + })?; + + // Write the snapshot + let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); + write_chunk( + &self.root, + &paths, + full_doc, + output_chunk_name, + &self.tmpdir, + )?; + } + Err(e) => { + tracing::error!(doc_id=%id, %e, "error loading chunks"); + } } Ok(()) } @@ -233,6 +278,7 @@ fn write_chunk( // Move the temporary file into a snapshot in the document data directory // with a name based on the hash of the heads of the document let output_path = paths.chunk_path(root, &name); + tracing::trace!(?temp_save_path, ?output_path, "renaming chunk file"); std::fs::rename(&temp_save_path, &output_path) .map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?; @@ -299,13 +345,13 @@ impl DocIdPaths { } } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] enum ChunkType { Snapshot, Incremental, } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] struct SavedChunkName { hash: Vec, chunk_type: ChunkType, @@ -421,7 +467,7 @@ impl Chunks { // Could be a concurrent process compacting, not an error tracing::warn!( missing_chunk_path=%path.display(), - "chunk file disappeared while reading chunks", + "chunk file disappeared while reading chunks; ignoring", ); continue; } diff --git a/src/repo.rs b/src/repo.rs index 571eb03..38a22ba 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -570,9 +570,13 @@ pub(crate) struct DocumentInfo { peer_connections: HashMap, /// Used to resolve futures for DocHandle::changed. change_observers: Vec>>, - /// Counter of patches since last save, + /// Counter of changes since last compact, /// used to make decisions about full or incemental saves. - patches_since_last_save: usize, + changes_since_last_compact: usize, + /// The number of changes after which a compaction will be performed. + allowable_changes_until_compaction: usize, + /// Last heads obtained from the automerge doc. + last_heads: Vec, } /// A state machine representing a connection between a remote repo and a particular document @@ -625,13 +629,19 @@ impl DocumentInfo { document: Arc>, handle_count: Arc, ) -> Self { + let last_heads = { + let doc = document.read(); + doc.automerge.get_heads() + }; DocumentInfo { state, document, handle_count, peer_connections: Default::default(), change_observers: Default::default(), - patches_since_last_save: 0, + changes_since_last_compact: 0, + allowable_changes_until_compaction: 10, + last_heads, } } @@ -649,7 +659,7 @@ impl DocumentInfo { | DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => { - assert_eq!(self.patches_since_last_save, 0); + assert_eq!(self.changes_since_last_compact, 0); DocState::PendingRemoval(vec![]) } DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)), @@ -747,8 +757,22 @@ impl DocumentInfo { /// Count patches since last save, /// returns whether there were any. fn note_changes(&mut self) -> bool { - // TODO: count patches somehow. - true + // TODO, Can we do this without a read lock? + // I think that if the changes update last_heads and + // we store `last_heads_since_note` we can get a bool out of this. + let count = { + let doc = self.document.read(); + let changes = doc.automerge.get_changes(&self.last_heads); + tracing::trace!( + last_heads=?self.last_heads, + current_heads=?doc.automerge.get_heads(), + "checking for changes since last save" + ); + changes.len() + }; + let has_patches = count > 0; + self.changes_since_last_compact = self.changes_since_last_compact.saturating_add(count); + has_patches } fn resolve_change_observers(&mut self, result: Result<(), RepoError>) { @@ -766,19 +790,24 @@ impl DocumentInfo { if !self.state.should_save() { return; } - let should_compact = self.patches_since_last_save > 10; - let storage_fut = if should_compact { - let to_save = { - let mut doc = self.document.write(); - doc.automerge.save() + let should_compact = + self.changes_since_last_compact > self.allowable_changes_until_compaction; + let (storage_fut, new_heads) = if should_compact { + let (to_save, new_heads) = { + let doc = self.document.read(); + (doc.automerge.save(), doc.automerge.get_heads()) }; - storage.compact(document_id.clone(), to_save) + self.changes_since_last_compact = 0; + (storage.compact(document_id.clone(), to_save), new_heads) } else { - let to_save = { - let mut doc = self.document.write(); - doc.automerge.save_incremental() + let (to_save, new_heads) = { + let doc = self.document.read(); + ( + doc.automerge.save_after(&self.last_heads), + doc.automerge.get_heads(), + ) }; - storage.append(document_id.clone(), to_save) + (storage.append(document_id.clone(), to_save), new_heads) }; match self.state { DocState::Sync(ref mut futs) => { @@ -791,7 +820,7 @@ impl DocumentInfo { } let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id)); self.state.poll_pending_save(waker); - self.patches_since_last_save = 0; + self.last_heads = new_heads; } /// Apply incoming sync messages, @@ -1343,15 +1372,16 @@ impl Repo { // Handle doc changes: sync the document. let local_repo_id = self.get_repo_id().clone(); if let Some(info) = self.documents.get_mut(&doc_id) { - if !info.note_changes() { - // Stop here if the document wasn't actually changed. - return; + // only run the documents_with_changes workflow if there + // was a change, but always generate potential sync messages + // (below) + if info.note_changes() { + self.documents_with_changes.push(doc_id.clone()); } let is_first_edit = matches!(info.state, DocState::LocallyCreatedNotEdited); if is_first_edit { info.state = DocState::Sync(vec![]); } - self.documents_with_changes.push(doc_id.clone()); for (to_repo_id, message) in info.generate_sync_messages().into_iter() { let outgoing = NetworkMessage::Sync { from_repo_id: local_repo_id.clone(), @@ -1445,13 +1475,18 @@ impl Repo { &self.repo_id, ); } - RepoEvent::AddChangeObserver(doc_id, change_hash, mut observer) => { + RepoEvent::AddChangeObserver(doc_id, last_heads, mut observer) => { if let Some(info) = self.documents.get_mut(&doc_id) { let current_heads = { let state = info.document.read(); state.automerge.get_heads() }; - if current_heads == change_hash { + tracing::trace!( + ?current_heads, + ?last_heads, + "handling AddChangeObserver event" + ); + if current_heads == last_heads { info.change_observers.push(observer); } else { // Resolve now if the document hash already changed. @@ -1555,8 +1590,7 @@ impl Repo { .expect("Doc should have an info by now."); let (has_changes, peer_conn_commands) = info.receive_sync_message(per_remote); - if has_changes { - info.note_changes(); + if has_changes && info.note_changes() { self.documents_with_changes.push(document_id.clone()); } diff --git a/test_utils/Cargo.toml b/test_utils/Cargo.toml index 4065309..319cdb5 100644 --- a/test_utils/Cargo.toml +++ b/test_utils/Cargo.toml @@ -12,7 +12,7 @@ tokio = { version = "1.27", features = ["full"] } tokio-util = "0.7.8" tokio-serde = {version = "0.8.0", features = ["json"]} serde_json = "1.0.96" -automerge = { version = "^0.3" } +automerge = { version = "0.5.0" } uuid = { version = "1.2.2"} crossbeam-channel = { version = "0.5.8" } parking_lot = { version = "0.12.1" } diff --git a/tests/network/document_changed.rs b/tests/network/document_changed.rs index 859ea64..f427ebe 100644 --- a/tests/network/document_changed.rs +++ b/tests/network/document_changed.rs @@ -31,14 +31,22 @@ async fn test_document_changed_over_sync() { // Request the document. let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap(); doc_handle.with_doc_mut(|doc| { - let mut tx = doc.transaction(); - tx.put( - automerge::ROOT, - "repo_id", - format!("{}", repo_handle_2.get_repo_id()), - ) - .expect("Failed to change the document."); - tx.commit(); + let val = doc + .get(automerge::ROOT, "repo_id") + .expect("Failed to read the document.") + .unwrap(); + tracing::debug!(heads=?doc.get_heads(), ?val, "before repo_handle_2 makes edit"); + { + let mut tx = doc.transaction(); + tx.put( + automerge::ROOT, + "repo_id", + format!("{}", repo_handle_2.get_repo_id()), + ) + .expect("Failed to change the document."); + tx.commit(); + } + tracing::debug!(heads=?doc.get_heads(), "after repo_handle_2 makes edit"); }); }); @@ -64,6 +72,7 @@ async fn test_document_changed_over_sync() { .get(automerge::ROOT, "repo_id") .expect("Failed to read the document.") .unwrap(); + tracing::debug!(?val, "after repo_handle_1 received sync"); assert_eq!(val.0.to_str().unwrap(), "repo1".to_string()); });