From f553d5e7358eb5233b34150b4603785cf9cc3f0b Mon Sep 17 00:00:00 2001 From: Alex Good Date: Fri, 21 Feb 2025 15:36:06 +0000 Subject: [PATCH 1/2] Improve logging --- src/fs_store.rs | 15 ++++++++------- src/repo.rs | 11 +++++++++-- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index ab33c5d..c77952f 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -164,7 +164,9 @@ impl FsStore { Ok(result.into_iter().collect()) } + #[tracing::instrument(skip(self, changes))] pub fn append(&self, id: &DocumentId, changes: &[u8]) -> Result<(), Error> { + tracing::debug!("writing incremental change"); let paths = DocIdPaths::from(id); std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| { Error(ErrorKind::CreateLevel2Path( @@ -179,7 +181,9 @@ impl FsStore { Ok(()) } + #[tracing::instrument(skip(self, full_doc))] pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> { + tracing::debug!("compacting document"); let paths = DocIdPaths::from(id); // Load all the data we have into a doc @@ -211,11 +215,8 @@ impl FsStore { let path = paths.chunk_path(&self.root, snapshot); if path == just_wrote { - tracing::error!( - ?path, - "Somehow trying to delete the same path we just wrote to. Not today \ - Satan" - ); + // This can happen if for some reason `compact` is called when the only thing + // on disk is a snapshot containing the changes we are being asked to compact continue; } @@ -415,7 +416,7 @@ impl Chunks { fn load(root: &Path, doc_id: &DocumentId) -> Result, Error> { let doc_id_hash = DocIdPaths::from(doc_id); let level2_path = doc_id_hash.level2_path(root); - tracing::debug!( + tracing::trace!( root=%root.display(), doc_id=?doc_id, doc_path=%level2_path.display(), @@ -459,7 +460,7 @@ impl Chunks { tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path"); continue; }; - tracing::debug!(chunk_path=%path.display(), "reading chunk file"); + tracing::trace!(chunk_path=%path.display(), "reading chunk file"); let contents = match std::fs::read(&path) { Ok(c) => c, Err(e) => { diff --git a/src/repo.rs b/src/repo.rs index 16aff3a..1d92f28 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -532,7 +532,8 @@ impl DocState { let pinned = Pin::new(&mut storage_fut); match pinned.poll(&mut Context::from_waker(&waker)) { Poll::Ready(Ok(_)) => None, - Poll::Ready(Err(_)) => { + Poll::Ready(Err(e)) => { + tracing::error!(err=?e, "error in save operation"); // TODO: propagate error to doc handle. // `with_doc_mut` could return a future for this. None @@ -555,7 +556,8 @@ impl DocState { let pinned = Pin::new(&mut storage_fut); let res = match pinned.poll(&mut Context::from_waker(&waker)) { Poll::Ready(Ok(_)) => None, - Poll::Ready(Err(_)) => { + Poll::Ready(Err(e)) => { + tracing::error!(err=?e, "error in storage operation"); // TODO: propagate error to doc handle. // `with_doc_mut` could return a future for this. None @@ -880,6 +882,9 @@ impl DocumentInfo { changes.len() }; let has_patches = count > 0; + if has_patches { + tracing::debug!("doc has changed"); + } self.changes_since_last_compact = self.changes_since_last_compact.saturating_add(count); has_patches } @@ -902,6 +907,7 @@ impl DocumentInfo { let should_compact = self.changes_since_last_compact > self.allowable_changes_until_compaction; let (storage_fut, new_heads) = if should_compact { + tracing::trace!(%document_id, "compacting document"); let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) @@ -909,6 +915,7 @@ impl DocumentInfo { self.changes_since_last_compact = 0; (storage.compact(document_id.clone(), to_save), new_heads) } else { + tracing::trace!(%document_id, "writing incremental chunk"); let (to_save, new_heads) = { let doc = self.document.read(); ( From 7a6847add99157d138e8b8d7c2cfddb1be84f965 Mon Sep 17 00:00:00 2001 From: Alex Good Date: Fri, 21 Feb 2025 15:36:06 +0000 Subject: [PATCH 2/2] Fix data loss when compacting a document in fs_store Problem: the `FsStore::compact` logic failed to load the data the caller passed to it when compacting. This meant that if the compacted document contained data which was not on disk then that data was lost. Solution: load the document and merge it with existing on disk state --- src/fs_store.rs | 57 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index c77952f..9d90a6e 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -186,11 +186,14 @@ impl FsStore { tracing::debug!("compacting document"); let paths = DocIdPaths::from(id); + let mut doc = automerge::Automerge::load(full_doc) + .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; + // Load all the data we have into a doc match Chunks::load(&self.root, id) { Ok(Some(chunks)) => { - let doc = chunks - .to_doc() + chunks + .add_to_doc(&mut doc) .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; // Write the snapshot @@ -225,11 +228,6 @@ impl FsStore { } } Ok(None) => { - // The chunks are missing in storage for whatever reason - // Try to recreate the document from full_doc if possible - let doc = automerge::Automerge::load(full_doc) - .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; - std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| { Error(ErrorKind::CreateLevel2Path( paths.level2_path(&self.root), @@ -492,7 +490,7 @@ impl Chunks { })) } - fn to_doc(&self) -> Result { + fn add_to_doc(&self, doc: &mut automerge::Automerge) -> Result<(), automerge::AutomergeError> { let mut bytes = Vec::new(); for chunk in self.snapshots.values() { bytes.extend(chunk); @@ -500,7 +498,8 @@ impl Chunks { for chunk in self.incrementals.values() { bytes.extend(chunk); } - automerge::Automerge::load(&bytes) + doc.load_incremental(&bytes)?; + Ok(()) } } @@ -557,3 +556,43 @@ mod error { DeleteChunk(PathBuf, std::io::Error), } } + +#[cfg(test)] +mod tests { + use automerge::{transaction::Transactable, AutoCommit, ReadDoc}; + use tempfile::tempdir; + + use crate::DocumentId; + + use super::FsStore; + + #[test] + fn compac_adds_new_changes_to_fs() { + let mut doc = AutoCommit::new(); + doc.put(automerge::ROOT, "foo", "bar").unwrap(); + + let data_dir = tempdir().unwrap(); + + let doc_id = DocumentId::random(); + let fs = FsStore::open(&data_dir).unwrap(); + let change = doc + .get_changes(&[]) + .into_iter() + .flat_map(|c| c.raw_bytes().to_vec()) + .collect::>(); + + fs.append(&doc_id, &change).unwrap(); + + doc.put(automerge::ROOT, "foo", "baz").unwrap(); + let compacted = doc.save(); + + fs.compact(&doc_id, &compacted).unwrap(); + + let reloaded_raw = fs.get(&doc_id).unwrap().unwrap(); + let reloaded = AutoCommit::load(&reloaded_raw).unwrap(); + assert_eq!( + reloaded.get(&automerge::ROOT, "foo").unwrap().unwrap().0, + "baz".into() + ); + } +}