From 5f0cf77f56a94b32d9c443feaf790fa7703daba6 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Wed, 6 Sep 2023 13:27:12 -0700 Subject: [PATCH 01/16] This commit has fixes for path partsing and it uses the root for temp files --- Cargo.toml | 2 +- src/fs_store.rs | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ef45bd6..b2f45d37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "automerge_repo" -version = "0.1.0" +version = "0.1.1" edition = "2021" [[example]] diff --git a/src/fs_store.rs b/src/fs_store.rs index d611f791..195dde50 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -51,7 +51,7 @@ use error::ErrorKind; /// 2. Load the data into an automerge document /// 3. `automerge::Automerge::save` the document to a temporary file /// 4. Rename the temporary file to a file in the data directory named -/// `SHA356(automerge::Automerge::get_heads)`.snapshot` +/// `SHA256(automerge::Automerge::get_heads)`.snapshot` /// 5. Delete all the files we loaded in step 1. /// /// The fact that we name the file after the heads of the document means that @@ -86,7 +86,7 @@ impl FsStore { pub fn get(&self, id: &DocumentId) -> Result>, Error> { let chunks = Chunks::load(&self.root, id)?; let Some(chunks) = chunks else { - return Ok(None) + return Ok(None); }; let mut result = Vec::new(); result.extend(chunks.snapshots.into_values().flatten()); @@ -122,14 +122,14 @@ impl FsStore { let metadata = entry .metadata() .map_err(|e| Error(ErrorKind::ErrReadingLevel2Path(entry.path(), e)))?; - if metadata.is_dir() { + if !metadata.is_dir() { tracing::warn!( non_file_path=%entry.path().display(), - "unexpected directory at level2 of database" + "unexpected non-directory at level2 of database" ); continue; } - let Some(doc_paths) = DocIdPaths::parse(&level1, entry.path()) else { + let Some(doc_paths) = DocIdPaths::parse(entry.path()) else { tracing::warn!( non_doc_path=%entry.path().display(), "unexpected non-document path at level2 of database" @@ -163,7 +163,7 @@ impl FsStore { // Load all the data we have into a doc let Some(chunks) = Chunks::load(&self.root, id)? else { tracing::warn!(doc_id=%id, "attempted to compact non-existent document"); - return Ok(()) + return Ok(()); }; let mut doc = chunks .to_doc() @@ -195,7 +195,7 @@ fn write_chunk( ) -> Result<(), Error> { // Write to a temp file and then rename to avoid partial writes let mut temp_save = - tempfile::NamedTempFile::new().map_err(|e| Error(ErrorKind::CreateTempFile(e)))?; + tempfile::NamedTempFile::new_in(root).map_err(|e| Error(ErrorKind::CreateTempFile(e)))?; let temp_save_path = temp_save.path().to_owned(); temp_save .as_file_mut() @@ -236,16 +236,18 @@ impl<'a> From<&'a DocumentId> for DocIdPaths { } impl DocIdPaths { - fn parse, P2: AsRef>(level1: P1, level2: P2) -> Option { - let level1 = level1.as_ref().to_str()?; + fn parse(level2: PathBuf) -> Option { + let level1 = level2.parent()?.file_name()?.to_str()?; + let level2 = level2.file_name()?.to_str()?; + let prefix = hex::decode(level1).ok()?; let prefix = <[u8; 2]>::try_from(prefix).ok()?; - let level2 = level2.as_ref().to_str()?; let doc_id_bytes = hex::decode(level2).ok()?; let doc_id_str = String::from_utf8(doc_id_bytes).ok()?; let doc_id = DocumentId::from(doc_id_str.as_str()); let result = Self::from(&doc_id); + if result.prefix != prefix { None } else { @@ -382,7 +384,8 @@ impl Chunks { tracing::warn!(bad_file=%path.display(), "unexpected non-file in level2 path"); continue; } - let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse) else { + let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse) + else { tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path"); continue; }; From feb30ea2edbde5d6264ff20baa0a621cddd09e21 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Mon, 11 Sep 2023 16:58:01 -0700 Subject: [PATCH 02/16] Compact based on time. --- Cargo.toml | 3 ++- src/fs_store.rs | 2 ++ src/repo.rs | 19 +++++++++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b2f45d37..c06be893 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "automerge_repo" -version = "0.1.1" +version = "0.1.0" edition = "2021" [[example]] @@ -39,6 +39,7 @@ tracing = "0.1.37" ring = "0.16.20" hex = "0.4.3" tempfile = "3.6.0" +metrics = "0.21.1" [dev-dependencies] clap = { version = "4.2.5", features = ["derive"] } diff --git a/src/fs_store.rs b/src/fs_store.rs index 195dde50..955ae0cf 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -415,6 +415,7 @@ impl Chunks { } } } + tracing::debug!("Returning chunks and snapshots"); Ok(Some(Chunks { snapshots, incrementals, @@ -429,6 +430,7 @@ impl Chunks { for chunk in self.incrementals.values() { bytes.extend(chunk); } + tracing::debug!("Going to load doc"); automerge::Automerge::load(&bytes) } } diff --git a/src/repo.rs b/src/repo.rs index 8626465f..9bc3106c 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -22,6 +22,9 @@ use std::sync::Arc; use std::thread::{self, JoinHandle}; use uuid::Uuid; +// TODO Feature +use metrics::increment_counter; + /// Front-end of the repo. #[derive(Debug, Clone)] pub struct RepoHandle { @@ -557,7 +560,11 @@ pub(crate) struct DocumentInfo { change_observers: Vec>>, /// Counter of patches since last save, /// used to make decisions about full or incemental saves. + #[deprecated] patches_since_last_save: usize, + /// Time since last save + /// used to make decisions about full or incremental saves. + time_since_last_full_save: std::time::Instant, } impl DocumentInfo { @@ -573,6 +580,7 @@ impl DocumentInfo { sync_states: Default::default(), change_observers: Default::default(), patches_since_last_save: 0, + time_since_last_full_save: std::time::Instant::now(), } } @@ -707,20 +715,28 @@ impl DocumentInfo { if !self.state.should_save() { return; } - let should_compact = self.patches_since_last_save > 10; + let since_last_compact = std::time::Instant::now() - self.time_since_last_full_save; + let should_compact = since_last_compact.as_secs() > 60; let storage_fut = if should_compact { let to_save = { let mut doc = self.document.write(); doc.automerge.save() }; + // Since this is a future it's possible that if our magic numbers + // are too low, we'll autosave every time. + self.time_since_last_full_save = std::time::Instant::now(); + self.patches_since_last_save = 0; + increment_counter!("save_compact"); storage.compact(document_id.clone(), to_save) } else { let to_save = { let mut doc = self.document.write(); + increment_counter!("save_incremental"); doc.automerge.save_incremental() }; storage.append(document_id.clone(), to_save) }; + match self.state { DocState::Sync(ref mut futs) => { futs.push(storage_fut); @@ -732,7 +748,6 @@ impl DocumentInfo { } let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id)); self.state.poll_pending_save(waker); - self.patches_since_last_save = 0; } /// Apply incoming sync messages, From 2dcbe8cd57a40b544ec3ff1eb3f0ad92fba4f7d8 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Tue, 12 Sep 2023 10:51:56 -0700 Subject: [PATCH 03/16] Temporairly add debug log (move to trace or remove later) --- src/repo.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/repo.rs b/src/repo.rs index 9bc3106c..989f9958 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -726,11 +726,13 @@ impl DocumentInfo { // are too low, we'll autosave every time. self.time_since_last_full_save = std::time::Instant::now(); self.patches_since_last_save = 0; + tracing::debug!("Compacting Document"); increment_counter!("save_compact"); storage.compact(document_id.clone(), to_save) } else { let to_save = { let mut doc = self.document.write(); + tracing::debug!("Incremental Update"); increment_counter!("save_incremental"); doc.automerge.save_incremental() }; From 798dce8fc3a90b500d0d577d14825715140d44e7 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Tue, 12 Sep 2023 16:42:53 -0700 Subject: [PATCH 04/16] Attempt to force compaction --- src/repo.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/repo.rs b/src/repo.rs index 989f9958..dff2c3fc 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -716,7 +716,8 @@ impl DocumentInfo { return; } let since_last_compact = std::time::Instant::now() - self.time_since_last_full_save; - let should_compact = since_last_compact.as_secs() > 60; + // TODO -- don't `true` this... + let should_compact = true; let storage_fut = if should_compact { let to_save = { let mut doc = self.document.write(); From faa58e9f30038e53cc064d5dbf4584d8234ebd4c Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Tue, 12 Sep 2023 20:19:49 -0700 Subject: [PATCH 05/16] Get a little more clever about first compaction --- src/fs_store.rs | 66 ++++++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index 955ae0cf..f8dd7104 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -144,12 +144,6 @@ impl FsStore { pub fn append(&self, id: &DocumentId, changes: &[u8]) -> Result<(), Error> { let paths = DocIdPaths::from(id); - std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| { - Error(ErrorKind::CreateLevel2Path( - paths.level2_path(&self.root), - e, - )) - })?; let chunk_name = SavedChunkName::new_incremental(changes); write_chunk(&self.root, &paths, changes, chunk_name)?; @@ -157,31 +151,44 @@ impl FsStore { Ok(()) } - pub fn compact(&self, id: &DocumentId, _full_doc: &[u8]) -> Result<(), Error> { + pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> { let paths = DocIdPaths::from(id); // Load all the data we have into a doc - let Some(chunks) = Chunks::load(&self.root, id)? else { - tracing::warn!(doc_id=%id, "attempted to compact non-existent document"); - return Ok(()); - }; - let mut doc = chunks - .to_doc() - .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; - - // Write the snapshot - let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); - let chunk = doc.save(); - write_chunk(&self.root, &paths, &chunk, output_chunk_name)?; - - // Remove all the old data - for incremental in chunks.incrementals.keys() { - let path = paths.chunk_path(&self.root, incremental); - std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; - } - for snapshot in chunks.snapshots.keys() { - let path = paths.chunk_path(&self.root, snapshot); - std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + match Chunks::load(&self.root, id) { + Ok(Some(chunks)) => { + let mut doc = chunks + .to_doc() + .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; + + // Write the snapshot + let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); + let chunk = doc.save(); + write_chunk(&self.root, &paths, &chunk, output_chunk_name)?; + + // Remove all the old data + for incremental in chunks.incrementals.keys() { + let path = paths.chunk_path(&self.root, incremental); + std::fs::remove_file(&path) + .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + } + for snapshot in chunks.snapshots.keys() { + let path = paths.chunk_path(&self.root, snapshot); + std::fs::remove_file(&path) + .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + } + } + Ok(None) => { + let output_chunk_name = SavedChunkName { + hash: uuid::Uuid::new_v4().as_bytes().to_vec(), + chunk_type: ChunkType::Snapshot, + }; + // Write the snapshot + write_chunk(&self.root, &paths, &full_doc, output_chunk_name)?; + } + Err(e) => { + tracing::error!(e=%e, "Error loading chunks"); + } } Ok(()) } @@ -206,6 +213,9 @@ fn write_chunk( .sync_all() .map_err(|e| Error(ErrorKind::WriteTempFile(temp_save_path.clone(), e)))?; + std::fs::create_dir_all(paths.level2_path(&root)) + .map_err(|e| Error(ErrorKind::CreateLevel2Path(paths.level2_path(&root), e)))?; + // Move the temporary file into a snapshot in the document data directory // with a name based on the hash of the heads of the document let output_path = paths.chunk_path(root, &name); From 3382ca217a8c2a834d6638968b65c1830d2d1be6 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Tue, 12 Sep 2023 16:35:45 -0700 Subject: [PATCH 06/16] Fix minor test bug. AsyncInMemoryStorage has a bug in compaction where the first entry is not able to be compacted. This would not normally be caught, but there's a bug in repo.rs where compaction doesn't happen, and I'm trying to track that down. --- test_utils/src/storage_utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test_utils/src/storage_utils.rs b/test_utils/src/storage_utils.rs index fb4d1bb3..fd0365cc 100644 --- a/test_utils/src/storage_utils.rs +++ b/test_utils/src/storage_utils.rs @@ -136,8 +136,8 @@ impl AsyncInMemoryStorage { StorageRequest::Compact(doc_id, data, sender) => { let _entry = documents .entry(doc_id) - .and_modify(|entry| *entry = data) - .or_insert_with(Default::default); + .and_modify(|entry| *entry = data.clone()) + .or_insert_with(|| data); let (tx, rx) = oneshot(); results.push_back(tx); tokio::spawn(async move { From 23201b8e92f519b454ae06d343868ffa85772718 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Wed, 13 Sep 2023 09:18:45 -0700 Subject: [PATCH 07/16] Clippy & fmt --- src/fs_store.rs | 6 +++--- src/repo.rs | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index f8dd7104..a44bf7b8 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -184,7 +184,7 @@ impl FsStore { chunk_type: ChunkType::Snapshot, }; // Write the snapshot - write_chunk(&self.root, &paths, &full_doc, output_chunk_name)?; + write_chunk(&self.root, &paths, full_doc, output_chunk_name)?; } Err(e) => { tracing::error!(e=%e, "Error loading chunks"); @@ -213,8 +213,8 @@ fn write_chunk( .sync_all() .map_err(|e| Error(ErrorKind::WriteTempFile(temp_save_path.clone(), e)))?; - std::fs::create_dir_all(paths.level2_path(&root)) - .map_err(|e| Error(ErrorKind::CreateLevel2Path(paths.level2_path(&root), e)))?; + std::fs::create_dir_all(paths.level2_path(root)) + .map_err(|e| Error(ErrorKind::CreateLevel2Path(paths.level2_path(root), e)))?; // Move the temporary file into a snapshot in the document data directory // with a name based on the hash of the heads of the document diff --git a/src/repo.rs b/src/repo.rs index dff2c3fc..06288fa7 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -560,7 +560,6 @@ pub(crate) struct DocumentInfo { change_observers: Vec>>, /// Counter of patches since last save, /// used to make decisions about full or incemental saves. - #[deprecated] patches_since_last_save: usize, /// Time since last save /// used to make decisions about full or incremental saves. @@ -715,7 +714,7 @@ impl DocumentInfo { if !self.state.should_save() { return; } - let since_last_compact = std::time::Instant::now() - self.time_since_last_full_save; + let _since_last_compact = std::time::Instant::now() - self.time_since_last_full_save; // TODO -- don't `true` this... let should_compact = true; let storage_fut = if should_compact { From c2f6fc944c487de08e51a686d27eea977076d038 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Wed, 13 Sep 2023 15:09:20 -0700 Subject: [PATCH 08/16] Put autosurgeon up to the latest --- Cargo.toml | 1 - src/fs_store.rs | 2 -- src/repo.rs | 1 - test_utils/Cargo.toml | 2 +- 4 files changed, 1 insertion(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4fb97399..8381ba47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,6 @@ tracing = "0.1.37" ring = "0.16.20" hex = "0.4.3" tempfile = "3.6.0" -metrics = "0.21.1" [dev-dependencies] clap = { version = "4.2.5", features = ["derive"] } diff --git a/src/fs_store.rs b/src/fs_store.rs index a6df2272..af1698c0 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -425,7 +425,6 @@ impl Chunks { } } } - tracing::debug!("Returning chunks and snapshots"); Ok(Some(Chunks { snapshots, incrementals, @@ -440,7 +439,6 @@ impl Chunks { for chunk in self.incrementals.values() { bytes.extend(chunk); } - tracing::debug!("Going to load doc"); automerge::Automerge::load(&bytes) } } diff --git a/src/repo.rs b/src/repo.rs index 20f8d48b..5ebf5a53 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -737,7 +737,6 @@ impl DocumentInfo { }; (storage.append(document_id.clone(), to_save), new_heads) }; - match self.state { DocState::Sync(ref mut futs) => { futs.push(storage_fut); diff --git a/test_utils/Cargo.toml b/test_utils/Cargo.toml index 4065309a..319cdb51 100644 --- a/test_utils/Cargo.toml +++ b/test_utils/Cargo.toml @@ -12,7 +12,7 @@ tokio = { version = "1.27", features = ["full"] } tokio-util = "0.7.8" tokio-serde = {version = "0.8.0", features = ["json"]} serde_json = "1.0.96" -automerge = { version = "^0.3" } +automerge = { version = "0.5.0" } uuid = { version = "1.2.2"} crossbeam-channel = { version = "0.5.8" } parking_lot = { version = "0.12.1" } From 962b38c27fad462dd2bc99b2e61bd3d14cb6fcf6 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Thu, 14 Sep 2023 15:44:35 -0700 Subject: [PATCH 09/16] Put the patches_since_last_* in the right spot, and rename it. --- src/repo.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/repo.rs b/src/repo.rs index 5ebf5a53..8b74ead3 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -555,9 +555,9 @@ pub(crate) struct DocumentInfo { sync_states: HashMap, /// Used to resolve futures for DocHandle::changed. change_observers: Vec>>, - /// Counter of patches since last save, + /// Counter of patches since last compact, /// used to make decisions about full or incemental saves. - patches_since_last_save: usize, + patches_since_last_compact: usize, /// Last heads obtained from the automerge doc. last_heads: Vec, } @@ -578,7 +578,7 @@ impl DocumentInfo { handle_count, sync_states: Default::default(), change_observers: Default::default(), - patches_since_last_save: 0, + patches_since_last_compact: 0, last_heads, } } @@ -597,7 +597,7 @@ impl DocumentInfo { | DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => { - assert_eq!(self.patches_since_last_save, 0); + assert_eq!(self.patches_since_last_compact, 0); DocState::PendingRemoval(vec![]) } DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)), @@ -701,7 +701,10 @@ impl DocumentInfo { changes.len() }; let has_patches = count > 0; - self.patches_since_last_save = self.patches_since_last_save.checked_add(count).unwrap_or(0); + self.patches_since_last_compact = self + .patches_since_last_compact + .checked_add(count) + .unwrap_or(0); has_patches } @@ -720,12 +723,13 @@ impl DocumentInfo { if !self.state.should_save() { return; } - let should_compact = self.patches_since_last_save > 10; + let should_compact = self.patches_since_last_compact > 10; let (storage_fut, new_heads) = if should_compact { let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) }; + self.patches_since_last_compact = 0; (storage.compact(document_id.clone(), to_save), new_heads) } else { let (to_save, new_heads) = { @@ -748,7 +752,6 @@ impl DocumentInfo { } let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id)); self.state.poll_pending_save(waker); - self.patches_since_last_save = 0; self.last_heads = new_heads; } From cf8df6ee19ee50311e2ffc0ab1fd518ceea16854 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Thu, 14 Sep 2023 21:16:53 -0700 Subject: [PATCH 10/16] Revert to a naive change note, and a thread local counter before compaction. --- src/repo.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/repo.rs b/src/repo.rs index 8b74ead3..bd3b16cb 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -555,9 +555,11 @@ pub(crate) struct DocumentInfo { sync_states: HashMap, /// Used to resolve futures for DocHandle::changed. change_observers: Vec>>, - /// Counter of patches since last compact, + /// Counter of local saves since last compact, /// used to make decisions about full or incemental saves. - patches_since_last_compact: usize, + saves_since_last_compact: usize, + /// + allowable_changes_until_compaction: usize, /// Last heads obtained from the automerge doc. last_heads: Vec, } @@ -578,7 +580,8 @@ impl DocumentInfo { handle_count, sync_states: Default::default(), change_observers: Default::default(), - patches_since_last_compact: 0, + saves_since_last_compact: 0, + allowable_changes_until_compaction: 10, last_heads, } } @@ -597,7 +600,7 @@ impl DocumentInfo { | DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => { - assert_eq!(self.patches_since_last_compact, 0); + assert_eq!(self.saves_since_last_compact, 0); DocState::PendingRemoval(vec![]) } DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)), @@ -695,17 +698,21 @@ impl DocumentInfo { /// Count patches since last save, /// returns whether there were any. fn note_changes(&mut self) -> bool { - let count = { + // TODO, Can we do this without a read lock? + // I think that if the changes update last_heads and + // we store `last_heads_since_note` we can get a bool out of this. + true + /*let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); changes.len() }; let has_patches = count > 0; - self.patches_since_last_compact = self - .patches_since_last_compact + self.saves_since_last_compact = self + .saves_since_last_compact .checked_add(count) .unwrap_or(0); - has_patches + has_patches*/ } fn resolve_change_observers(&mut self, result: Result<(), RepoError>) { @@ -723,13 +730,14 @@ impl DocumentInfo { if !self.state.should_save() { return; } - let should_compact = self.patches_since_last_compact > 10; + let should_compact = + self.saves_since_last_compact > self.allowable_changes_until_compaction; let (storage_fut, new_heads) = if should_compact { let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) }; - self.patches_since_last_compact = 0; + self.saves_since_last_compact = 0; (storage.compact(document_id.clone(), to_save), new_heads) } else { let (to_save, new_heads) = { @@ -739,6 +747,7 @@ impl DocumentInfo { doc.automerge.get_heads(), ) }; + self.saves_since_last_compact.checked_add(1).unwrap_or(0); (storage.append(document_id.clone(), to_save), new_heads) }; match self.state { From 2fc3c8b47f001e5d60f73c5f4e667ebf0e551a51 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Thu, 14 Sep 2023 21:41:18 -0700 Subject: [PATCH 11/16] Add logging --- src/repo.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/repo.rs b/src/repo.rs index bd3b16cb..e7c895d8 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -727,19 +727,24 @@ impl DocumentInfo { storage: &dyn Storage, wake_sender: &Sender, ) { + println!("We decided to save the document"); if !self.state.should_save() { + println!("No"); return; } let should_compact = self.saves_since_last_compact > self.allowable_changes_until_compaction; let (storage_fut, new_heads) = if should_compact { + println!("We decided to Compact the document"); let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) }; self.saves_since_last_compact = 0; + println!("Since compact is zero"); (storage.compact(document_id.clone(), to_save), new_heads) } else { + println!("We decided to incremental the document"); let (to_save, new_heads) = { let doc = self.document.read(); ( @@ -748,6 +753,7 @@ impl DocumentInfo { ) }; self.saves_since_last_compact.checked_add(1).unwrap_or(0); + println!("Saves since last compact {}", self.saves_since_last_compact); (storage.append(document_id.clone(), to_save), new_heads) }; match self.state { From b55cf0f1d5ba0c98b6d67a3435047cc2262b73cd Mon Sep 17 00:00:00 2001 From: gterzian <2792687+gterzian@users.noreply.github.com> Date: Fri, 15 Sep 2023 15:33:56 +0800 Subject: [PATCH 12/16] fix hanging changed sync test --- src/repo.rs | 7 ++++--- tests/document_changed.rs | 26 ++++++++++++++++---------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/repo.rs b/src/repo.rs index e7c895d8..ee795597 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -701,18 +701,18 @@ impl DocumentInfo { // TODO, Can we do this without a read lock? // I think that if the changes update last_heads and // we store `last_heads_since_note` we can get a bool out of this. - true - /*let count = { + let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); changes.len() }; let has_patches = count > 0; + println!("Has patches: {:?}", has_patches); self.saves_since_last_compact = self .saves_since_last_compact .checked_add(count) .unwrap_or(0); - has_patches*/ + has_patches } fn resolve_change_observers(&mut self, result: Result<(), RepoError>) { @@ -1232,6 +1232,7 @@ impl Repo { let local_repo_id = self.get_repo_id().clone(); if let Some(info) = self.documents.get_mut(&doc_id) { if !info.note_changes() { + println!("Doc didn't change"); // Stop here if the document wasn't actually changed. return; } diff --git a/tests/document_changed.rs b/tests/document_changed.rs index 6fb50c9d..91a792f2 100644 --- a/tests/document_changed.rs +++ b/tests/document_changed.rs @@ -40,6 +40,14 @@ fn test_document_changed_over_sync() { ); peers.insert(repo_handle_2.get_repo_id().clone(), network_1); peers.insert(repo_handle_1.get_repo_id().clone(), network_2); + + // Edit the document. + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put(automerge::ROOT, "repo_id", format!("{}", repo_handle_1.get_repo_id().clone())) + .expect("Failed to change the document."); + tx.commit(); + }); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -53,7 +61,11 @@ fn test_document_changed_over_sync() { // Request the document. let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap(); doc_handle.with_doc_mut(|doc| { - let mut tx = doc.transaction(); + let id = doc + .get(automerge::ROOT, "repo_id") + .expect("Failed to read the document.") + .unwrap(); + let mut tx = doc.transaction(); tx.put( automerge::ROOT, "repo_id", @@ -68,27 +80,21 @@ fn test_document_changed_over_sync() { let (done_sync_sender, mut done_sync_receiver) = channel(1); let repo_id = repo_handle_1.get_repo_id().clone(); rt.spawn(async move { - // Edit the document. - document_handle_1.with_doc_mut(|doc| { - let mut tx = doc.transaction(); - tx.put(automerge::ROOT, "repo_id", format!("{}", repo_id)) - .expect("Failed to change the document."); - tx.commit(); - }); loop { - // Await changes until the edit comes through over sync. - document_handle_1.changed().await.unwrap(); + // Await changes until the edit comes through over sync. let equals = document_handle_1.with_doc(|doc| { let val = doc .get(automerge::ROOT, "repo_id") .expect("Failed to read the document.") .unwrap(); + println!("Val: {:?}", val.0.to_str().unwrap()); val.0.to_str().unwrap() == format!("{}", expected_repo_id) }); if equals { done_sync_sender.send(()).await.unwrap(); break; } + document_handle_1.changed().await.unwrap(); } }); From dc3d23c7257c6372ff789d855d53dbfa7972e46e Mon Sep 17 00:00:00 2001 From: gterzian <2792687+gterzian@users.noreply.github.com> Date: Fri, 15 Sep 2023 16:11:01 +0800 Subject: [PATCH 13/16] when doc changes, always generate sync message, but skip saving if doc didn't change since last save --- src/repo.rs | 16 +++++++++------- tests/document_changed.rs | 9 ++++++--- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/repo.rs b/src/repo.rs index ee795597..2df419c3 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -704,6 +704,8 @@ impl DocumentInfo { let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); + println!("last: {:?}, current: {:?}", self.last_heads, doc.automerge.get_heads()); + //self.last_heads = doc.automerge.get_heads(); changes.len() }; let has_patches = count > 0; @@ -1231,16 +1233,13 @@ impl Repo { // Handle doc changes: sync the document. let local_repo_id = self.get_repo_id().clone(); if let Some(info) = self.documents.get_mut(&doc_id) { - if !info.note_changes() { - println!("Doc didn't change"); - // Stop here if the document wasn't actually changed. - return; + if info.note_changes() { + self.documents_with_changes.push(doc_id.clone()); } let is_first_edit = matches!(info.state, DocState::LocallyCreatedNotEdited); if is_first_edit { info.state = DocState::Sync(vec![]); } - self.documents_with_changes.push(doc_id.clone()); for (to_repo_id, message) in info.generate_sync_messages().into_iter() { let outgoing = NetworkMessage::Sync { from_repo_id: local_repo_id.clone(), @@ -1255,6 +1254,7 @@ impl Repo { self.sinks_to_poll.insert(to_repo_id); } if is_first_edit { + println!("First edit"); // Send a sync message to all other repos we are connected with. for repo_id in self.remote_repos.keys() { if let Some(message) = info.generate_first_sync_message(repo_id.clone()) @@ -1348,6 +1348,7 @@ impl Repo { let state = info.document.read(); state.automerge.get_heads() }; + println!("Change observer: {:?} {:?}", current_heads, change_hash); if current_heads == change_hash { info.change_observers.push(observer); } else { @@ -1473,8 +1474,9 @@ impl Repo { .expect("Doc should have an info by now."); if info.receive_sync_message(per_remote) { - info.note_changes(); - self.documents_with_changes.push(document_id.clone()); + if info.note_changes() { + self.documents_with_changes.push(document_id.clone()); + } } // Note: since receiving and generating sync messages is done diff --git a/tests/document_changed.rs b/tests/document_changed.rs index 91a792f2..796e8c76 100644 --- a/tests/document_changed.rs +++ b/tests/document_changed.rs @@ -61,18 +61,21 @@ fn test_document_changed_over_sync() { // Request the document. let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap(); doc_handle.with_doc_mut(|doc| { + println!("Heads when 2 makes edit: {:?}", doc.get_heads()); let id = doc .get(automerge::ROOT, "repo_id") .expect("Failed to read the document.") .unwrap(); - let mut tx = doc.transaction(); + println!("Id when two makes edit: {:?}", id); + {let mut tx = doc.transaction(); tx.put( automerge::ROOT, "repo_id", format!("{}", repo_handle_2.get_repo_id()), ) .expect("Failed to change the document."); - tx.commit(); + tx.commit();} + println!("Heads after 2 makes edit: {:?}", doc.get_heads()); }); }); @@ -87,7 +90,7 @@ fn test_document_changed_over_sync() { .get(automerge::ROOT, "repo_id") .expect("Failed to read the document.") .unwrap(); - println!("Val: {:?}", val.0.to_str().unwrap()); + println!("Val: {:?}", val); val.0.to_str().unwrap() == format!("{}", expected_repo_id) }); if equals { From 84ad6cf54c667f17d976c837f95e22fb98d3986b Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Fri, 15 Sep 2023 13:45:12 -0700 Subject: [PATCH 14/16] WIP log based debugging, catch situation where current snapshot is being deleted --- src/fs_store.rs | 31 ++++++++++++++++++++++++++----- src/repo.rs | 27 +++++++++++++++++---------- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index af1698c0..ce161ae5 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -157,6 +157,7 @@ impl FsStore { // Load all the data we have into a doc match Chunks::load(&self.root, id) { Ok(Some(chunks)) => { + println!("hmm..."); let doc = chunks .to_doc() .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; @@ -164,21 +165,32 @@ impl FsStore { // Write the snapshot let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); let chunk = doc.save(); - write_chunk(&self.root, &paths, &chunk, output_chunk_name)?; + println!("Going to write: {:#?}", output_chunk_name); + write_chunk(&self.root, &paths, &chunk, output_chunk_name.clone())?; // Remove all the old data for incremental in chunks.incrementals.keys() { let path = paths.chunk_path(&self.root, incremental); + println!("Removing {:?}", path); std::fs::remove_file(&path) .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; } + let just_wrote = paths.chunk_path(&self.root, &output_chunk_name); for snapshot in chunks.snapshots.keys() { let path = paths.chunk_path(&self.root, snapshot); + println!("Removing Snap {:?}", path); + + if path == just_wrote { + tracing::error!("Somehow trying to delete the same path we just wrote to. Not today Satan"); + continue; + } + std::fs::remove_file(&path) .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; } } Ok(None) => { + println!("No existing files,and compaction requested first"); let output_chunk_name = SavedChunkName { hash: uuid::Uuid::new_v4().as_bytes().to_vec(), chunk_type: ChunkType::Snapshot, @@ -187,6 +199,7 @@ impl FsStore { write_chunk(&self.root, &paths, full_doc, output_chunk_name)?; } Err(e) => { + println!("Error loading chunks for {:?} {}", self.root, id); tracing::error!(e=%e, "Error loading chunks"); } } @@ -219,6 +232,10 @@ fn write_chunk( // Move the temporary file into a snapshot in the document data directory // with a name based on the hash of the heads of the document let output_path = paths.chunk_path(root, &name); + + tracing::warn!("Renaming: {:?}", temp_save); + tracing::warn!("To: {:?}", output_path); + std::fs::rename(&temp_save_path, &output_path) .map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?; @@ -286,13 +303,13 @@ impl DocIdPaths { } } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] enum ChunkType { Snapshot, Incremental, } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] struct SavedChunkName { hash: Vec, chunk_type: ChunkType, @@ -355,7 +372,7 @@ impl Chunks { fn load(root: &Path, doc_id: &DocumentId) -> Result, Error> { let doc_id_hash = DocIdPaths::from(doc_id); let level2_path = doc_id_hash.level2_path(root); - tracing::debug!( + tracing::warn!( root=%root.display(), doc_id=?doc_id, doc_path=%level2_path.display(), @@ -439,7 +456,11 @@ impl Chunks { for chunk in self.incrementals.values() { bytes.extend(chunk); } - automerge::Automerge::load(&bytes) + + automerge::Automerge::load_with_options( + &bytes, + automerge::LoadOptions::new().on_partial_load(automerge::OnPartialLoad::Ignore), + ) } } diff --git a/src/repo.rs b/src/repo.rs index 2df419c3..d22ec9fb 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -557,7 +557,7 @@ pub(crate) struct DocumentInfo { change_observers: Vec>>, /// Counter of local saves since last compact, /// used to make decisions about full or incemental saves. - saves_since_last_compact: usize, + patches_since_last_compact: usize, /// allowable_changes_until_compaction: usize, /// Last heads obtained from the automerge doc. @@ -580,7 +580,7 @@ impl DocumentInfo { handle_count, sync_states: Default::default(), change_observers: Default::default(), - saves_since_last_compact: 0, + patches_since_last_compact: 0, allowable_changes_until_compaction: 10, last_heads, } @@ -600,7 +600,7 @@ impl DocumentInfo { | DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => { - assert_eq!(self.saves_since_last_compact, 0); + assert_eq!(self.patches_since_last_compact, 0); DocState::PendingRemoval(vec![]) } DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)), @@ -704,14 +704,18 @@ impl DocumentInfo { let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); - println!("last: {:?}, current: {:?}", self.last_heads, doc.automerge.get_heads()); + println!( + "last: {:?}, current: {:?}", + self.last_heads, + doc.automerge.get_heads() + ); //self.last_heads = doc.automerge.get_heads(); changes.len() }; let has_patches = count > 0; println!("Has patches: {:?}", has_patches); - self.saves_since_last_compact = self - .saves_since_last_compact + self.patches_since_last_compact = self + .patches_since_last_compact .checked_add(count) .unwrap_or(0); has_patches @@ -735,14 +739,14 @@ impl DocumentInfo { return; } let should_compact = - self.saves_since_last_compact > self.allowable_changes_until_compaction; + self.patches_since_last_compact > self.allowable_changes_until_compaction; let (storage_fut, new_heads) = if should_compact { println!("We decided to Compact the document"); let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) }; - self.saves_since_last_compact = 0; + self.patches_since_last_compact = 0; println!("Since compact is zero"); (storage.compact(document_id.clone(), to_save), new_heads) } else { @@ -754,8 +758,11 @@ impl DocumentInfo { doc.automerge.get_heads(), ) }; - self.saves_since_last_compact.checked_add(1).unwrap_or(0); - println!("Saves since last compact {}", self.saves_since_last_compact); + self.patches_since_last_compact.checked_add(1).unwrap_or(0); + println!( + "Saves since last compact {}", + self.patches_since_last_compact + ); (storage.append(document_id.clone(), to_save), new_heads) }; match self.state { From 6c221cdd882738f6a9d34c0993ff62978b958b8b Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Fri, 15 Sep 2023 14:56:20 -0700 Subject: [PATCH 15/16] Reduce logs noise --- src/fs_store.rs | 26 ++++++++++--------------- src/network_connect.rs | 2 +- src/repo.rs | 15 ++------------- tests/document_changed.rs | 40 ++++++++++++++++++++++----------------- tests/fs_storage/main.rs | 1 + 5 files changed, 37 insertions(+), 47 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index ce161ae5..4a453d14 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -106,7 +106,7 @@ impl FsStore { .map_err(|e| Error(ErrorKind::ErrReadingLevel1Path(entry.path(), e)))? .is_file() { - tracing::warn!( + tracing::trace!( non_dir_path=%entry.path().display(), "unexpected non-directory at level1 of database" ); @@ -123,14 +123,14 @@ impl FsStore { .metadata() .map_err(|e| Error(ErrorKind::ErrReadingLevel2Path(entry.path(), e)))?; if !metadata.is_dir() { - tracing::warn!( + tracing::trace!( non_file_path=%entry.path().display(), "unexpected non-directory at level2 of database" ); continue; } let Some(doc_paths) = DocIdPaths::parse(entry.path()) else { - tracing::warn!( + tracing::trace!( non_doc_path=%entry.path().display(), "unexpected non-document path at level2 of database" ); @@ -157,7 +157,6 @@ impl FsStore { // Load all the data we have into a doc match Chunks::load(&self.root, id) { Ok(Some(chunks)) => { - println!("hmm..."); let doc = chunks .to_doc() .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; @@ -165,23 +164,20 @@ impl FsStore { // Write the snapshot let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); let chunk = doc.save(); - println!("Going to write: {:#?}", output_chunk_name); write_chunk(&self.root, &paths, &chunk, output_chunk_name.clone())?; // Remove all the old data for incremental in chunks.incrementals.keys() { let path = paths.chunk_path(&self.root, incremental); - println!("Removing {:?}", path); std::fs::remove_file(&path) .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; } let just_wrote = paths.chunk_path(&self.root, &output_chunk_name); for snapshot in chunks.snapshots.keys() { let path = paths.chunk_path(&self.root, snapshot); - println!("Removing Snap {:?}", path); if path == just_wrote { - tracing::error!("Somehow trying to delete the same path we just wrote to. Not today Satan"); + tracing::trace!("Somehow trying to delete the same path we just wrote to. Not today Satan"); continue; } @@ -190,7 +186,6 @@ impl FsStore { } } Ok(None) => { - println!("No existing files,and compaction requested first"); let output_chunk_name = SavedChunkName { hash: uuid::Uuid::new_v4().as_bytes().to_vec(), chunk_type: ChunkType::Snapshot, @@ -199,7 +194,6 @@ impl FsStore { write_chunk(&self.root, &paths, full_doc, output_chunk_name)?; } Err(e) => { - println!("Error loading chunks for {:?} {}", self.root, id); tracing::error!(e=%e, "Error loading chunks"); } } @@ -233,8 +227,8 @@ fn write_chunk( // with a name based on the hash of the heads of the document let output_path = paths.chunk_path(root, &name); - tracing::warn!("Renaming: {:?}", temp_save); - tracing::warn!("To: {:?}", output_path); + tracing::trace!("Renaming: {:?}", temp_save); + tracing::trace!("To: {:?}", output_path); std::fs::rename(&temp_save_path, &output_path) .map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?; @@ -372,7 +366,7 @@ impl Chunks { fn load(root: &Path, doc_id: &DocumentId) -> Result, Error> { let doc_id_hash = DocIdPaths::from(doc_id); let level2_path = doc_id_hash.level2_path(root); - tracing::warn!( + tracing::trace!( root=%root.display(), doc_id=?doc_id, doc_path=%level2_path.display(), @@ -408,12 +402,12 @@ impl Chunks { .map_err(|e| Error(ErrorKind::ErrReadingChunkFileMetadata(path.clone(), e)))? .is_file() { - tracing::warn!(bad_file=%path.display(), "unexpected non-file in level2 path"); + tracing::trace!(bad_file=%path.display(), "unexpected non-file in level2 path"); continue; } let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse) else { - tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path"); + tracing::trace!(bad_file=%path.display(), "unexpected non-chunk file in level2 path"); continue; }; tracing::debug!(chunk_path=%path.display(), "reading chunk file"); @@ -423,7 +417,7 @@ impl Chunks { match e.kind() { std::io::ErrorKind::NotFound => { // Could be a concurrent process compacting, not an error - tracing::warn!( + tracing::trace!( missing_chunk_path=%path.display(), "chunk file disappeared while reading chunks", ); diff --git a/src/network_connect.rs b/src/network_connect.rs index 4bf15be6..61006ea8 100644 --- a/src/network_connect.rs +++ b/src/network_connect.rs @@ -35,7 +35,7 @@ impl RepoHandle { Ok(repo_msg) } Ok(m) => { - tracing::warn!(?m, repo_id=?repo_id, "Received non-repo message"); + tracing::trace!(?m, repo_id=?repo_id, "Received non-repo message"); Err(NetworkError::Error) } Err(e) => { diff --git a/src/repo.rs b/src/repo.rs index d22ec9fb..3b9acae5 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -704,7 +704,7 @@ impl DocumentInfo { let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); - println!( + tracing::trace!( "last: {:?}, current: {:?}", self.last_heads, doc.automerge.get_heads() @@ -713,7 +713,6 @@ impl DocumentInfo { changes.len() }; let has_patches = count > 0; - println!("Has patches: {:?}", has_patches); self.patches_since_last_compact = self .patches_since_last_compact .checked_add(count) @@ -733,24 +732,19 @@ impl DocumentInfo { storage: &dyn Storage, wake_sender: &Sender, ) { - println!("We decided to save the document"); if !self.state.should_save() { - println!("No"); return; } let should_compact = self.patches_since_last_compact > self.allowable_changes_until_compaction; let (storage_fut, new_heads) = if should_compact { - println!("We decided to Compact the document"); let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) }; self.patches_since_last_compact = 0; - println!("Since compact is zero"); (storage.compact(document_id.clone(), to_save), new_heads) } else { - println!("We decided to incremental the document"); let (to_save, new_heads) = { let doc = self.document.read(); ( @@ -759,10 +753,6 @@ impl DocumentInfo { ) }; self.patches_since_last_compact.checked_add(1).unwrap_or(0); - println!( - "Saves since last compact {}", - self.patches_since_last_compact - ); (storage.append(document_id.clone(), to_save), new_heads) }; match self.state { @@ -1261,7 +1251,6 @@ impl Repo { self.sinks_to_poll.insert(to_repo_id); } if is_first_edit { - println!("First edit"); // Send a sync message to all other repos we are connected with. for repo_id in self.remote_repos.keys() { if let Some(message) = info.generate_first_sync_message(repo_id.clone()) @@ -1355,7 +1344,7 @@ impl Repo { let state = info.document.read(); state.automerge.get_heads() }; - println!("Change observer: {:?} {:?}", current_heads, change_hash); + tracing::trace!("Change observer: {:?} {:?}", current_heads, change_hash); if current_heads == change_hash { info.change_observers.push(observer); } else { diff --git a/tests/document_changed.rs b/tests/document_changed.rs index 796e8c76..7eb74a55 100644 --- a/tests/document_changed.rs +++ b/tests/document_changed.rs @@ -40,12 +40,16 @@ fn test_document_changed_over_sync() { ); peers.insert(repo_handle_2.get_repo_id().clone(), network_1); peers.insert(repo_handle_1.get_repo_id().clone(), network_2); - + // Edit the document. document_handle_1.with_doc_mut(|doc| { let mut tx = doc.transaction(); - tx.put(automerge::ROOT, "repo_id", format!("{}", repo_handle_1.get_repo_id().clone())) - .expect("Failed to change the document."); + tx.put( + automerge::ROOT, + "repo_id", + format!("{}", repo_handle_1.get_repo_id().clone()), + ) + .expect("Failed to change the document."); tx.commit(); }); @@ -63,18 +67,20 @@ fn test_document_changed_over_sync() { doc_handle.with_doc_mut(|doc| { println!("Heads when 2 makes edit: {:?}", doc.get_heads()); let id = doc - .get(automerge::ROOT, "repo_id") - .expect("Failed to read the document.") - .unwrap(); - println!("Id when two makes edit: {:?}", id); - {let mut tx = doc.transaction(); - tx.put( - automerge::ROOT, - "repo_id", - format!("{}", repo_handle_2.get_repo_id()), - ) - .expect("Failed to change the document."); - tx.commit();} + .get(automerge::ROOT, "repo_id") + .expect("Failed to read the document.") + .unwrap(); + println!("Id when two makes edit: {:?}", id); + { + let mut tx = doc.transaction(); + tx.put( + automerge::ROOT, + "repo_id", + format!("{}", repo_handle_2.get_repo_id()), + ) + .expect("Failed to change the document."); + tx.commit(); + } println!("Heads after 2 makes edit: {:?}", doc.get_heads()); }); }); @@ -84,13 +90,13 @@ fn test_document_changed_over_sync() { let repo_id = repo_handle_1.get_repo_id().clone(); rt.spawn(async move { loop { - // Await changes until the edit comes through over sync. + // Await changes until the edit comes through over sync. let equals = document_handle_1.with_doc(|doc| { let val = doc .get(automerge::ROOT, "repo_id") .expect("Failed to read the document.") .unwrap(); - println!("Val: {:?}", val); + println!("Val: {:?}", val); val.0.to_str().unwrap() == format!("{}", expected_repo_id) }); if equals { diff --git a/tests/fs_storage/main.rs b/tests/fs_storage/main.rs index 82613918..74a6d78e 100644 --- a/tests/fs_storage/main.rs +++ b/tests/fs_storage/main.rs @@ -1,6 +1,7 @@ use automerge::transaction::Transactable; use automerge_repo::fs_store; use itertools::Itertools; +use uuid::Uuid; /// Asserts that the &[u8] in `data` is some permutation of the chunks of Vec<&[u8> in `expected` macro_rules! assert_permutation_of { From ee9504da48b7372073b9a19ef1eb7a9b74724ed3 Mon Sep 17 00:00:00 2001 From: Issac Kelly Date: Fri, 15 Sep 2023 18:40:16 -0700 Subject: [PATCH 16/16] fs_store and InMemory/repo.rs had different visions of what compact should do. I believe this rectifies them. --- src/fs_store.rs | 39 +++++++-------------------------- src/interfaces.rs | 2 ++ src/repo.rs | 5 ++++- src/tokio/fs_storage.rs | 8 ++++++- test_utils/src/storage_utils.rs | 4 ++++ tests/document_changed.rs | 1 - tests/fs_storage/main.rs | 5 +++-- 7 files changed, 28 insertions(+), 36 deletions(-) diff --git a/src/fs_store.rs b/src/fs_store.rs index 4a453d14..5bff5359 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -151,20 +151,20 @@ impl FsStore { Ok(()) } - pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> { + pub fn compact( + &self, + id: &DocumentId, + full_doc: &[u8], + new_heads: Vec, + ) -> Result<(), Error> { let paths = DocIdPaths::from(id); // Load all the data we have into a doc match Chunks::load(&self.root, id) { Ok(Some(chunks)) => { - let doc = chunks - .to_doc() - .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; - // Write the snapshot - let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); - let chunk = doc.save(); - write_chunk(&self.root, &paths, &chunk, output_chunk_name.clone())?; + let output_chunk_name = SavedChunkName::new_snapshot(new_heads); + write_chunk(&self.root, &paths, full_doc, output_chunk_name.clone())?; // Remove all the old data for incremental in chunks.incrementals.keys() { @@ -172,15 +172,9 @@ impl FsStore { std::fs::remove_file(&path) .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; } - let just_wrote = paths.chunk_path(&self.root, &output_chunk_name); for snapshot in chunks.snapshots.keys() { let path = paths.chunk_path(&self.root, snapshot); - if path == just_wrote { - tracing::trace!("Somehow trying to delete the same path we just wrote to. Not today Satan"); - continue; - } - std::fs::remove_file(&path) .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; } @@ -441,21 +435,6 @@ impl Chunks { incrementals, })) } - - fn to_doc(&self) -> Result { - let mut bytes = Vec::new(); - for chunk in self.snapshots.values() { - bytes.extend(chunk); - } - for chunk in self.incrementals.values() { - bytes.extend(chunk); - } - - automerge::Automerge::load_with_options( - &bytes, - automerge::LoadOptions::new().on_partial_load(automerge::OnPartialLoad::Ignore), - ) - } } mod error { @@ -499,8 +478,6 @@ mod error { ErrReadingChunkFile(PathBuf, std::io::Error), #[error("error creating level 2 path {0}: {1}")] CreateLevel2Path(PathBuf, std::io::Error), - #[error("error loading doc to compact: {0}")] - LoadDocToCompact(automerge::AutomergeError), #[error("error creating temp file: {0}")] CreateTempFile(std::io::Error), #[error("error writing temp file {0}: {1}")] diff --git a/src/interfaces.rs b/src/interfaces.rs index 8fc7b9a2..3c72d1a9 100644 --- a/src/interfaces.rs +++ b/src/interfaces.rs @@ -1,3 +1,4 @@ +use automerge::ChangeHash; use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -113,5 +114,6 @@ pub trait Storage: Send { &self, _id: DocumentId, _full_doc: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>>; } diff --git a/src/repo.rs b/src/repo.rs index 3b9acae5..52fe588f 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -743,7 +743,10 @@ impl DocumentInfo { (doc.automerge.save(), doc.automerge.get_heads()) }; self.patches_since_last_compact = 0; - (storage.compact(document_id.clone(), to_save), new_heads) + ( + storage.compact(document_id.clone(), to_save, new_heads.clone()), + new_heads, + ) } else { let (to_save, new_heads) = { let doc = self.document.read(); diff --git a/src/tokio/fs_storage.rs b/src/tokio/fs_storage.rs index 38afd348..c5e20288 100644 --- a/src/tokio/fs_storage.rs +++ b/src/tokio/fs_storage.rs @@ -78,11 +78,17 @@ impl Storage for FsStorage { &self, id: crate::DocumentId, full_doc: Vec, + new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { let inner = Arc::clone(&self.inner); let inner_id = id.clone(); self.handle - .spawn_blocking(move || inner.lock().unwrap().compact(&inner_id, &full_doc)) + .spawn_blocking(move || { + inner + .lock() + .unwrap() + .compact(&inner_id, &full_doc, new_heads) + }) .map(handle_joinerror) .map_err(move |e| { tracing::error!(err=?e, doc=?id, "error compacting chunk to filesystem"); diff --git a/test_utils/src/storage_utils.rs b/test_utils/src/storage_utils.rs index fd0365cc..c43707c2 100644 --- a/test_utils/src/storage_utils.rs +++ b/test_utils/src/storage_utils.rs @@ -1,3 +1,4 @@ +use automerge::ChangeHash; use automerge_repo::{DocumentId, Storage, StorageError}; use futures::future::{BoxFuture, TryFutureExt}; use futures::FutureExt; @@ -30,6 +31,7 @@ impl Storage for SimpleStorage { &self, _id: DocumentId, _chunk: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { futures::future::ready(Ok(())).boxed() } @@ -76,6 +78,7 @@ impl Storage for InMemoryStorage { &self, id: DocumentId, full_doc: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { let mut documents = self.documents.lock(); documents.insert(id, full_doc); @@ -202,6 +205,7 @@ impl Storage for AsyncInMemoryStorage { &self, id: DocumentId, full_doc: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { let (tx, rx) = oneshot(); self.chan diff --git a/tests/document_changed.rs b/tests/document_changed.rs index 7eb74a55..eae1f14a 100644 --- a/tests/document_changed.rs +++ b/tests/document_changed.rs @@ -87,7 +87,6 @@ fn test_document_changed_over_sync() { // Spawn a task that awaits the document change. let (done_sync_sender, mut done_sync_receiver) = channel(1); - let repo_id = repo_handle_1.get_repo_id().clone(); rt.spawn(async move { loop { // Await changes until the edit comes through over sync. diff --git a/tests/fs_storage/main.rs b/tests/fs_storage/main.rs index 74a6d78e..e39ad4b3 100644 --- a/tests/fs_storage/main.rs +++ b/tests/fs_storage/main.rs @@ -1,7 +1,6 @@ use automerge::transaction::Transactable; use automerge_repo::fs_store; use itertools::Itertools; -use uuid::Uuid; /// Asserts that the &[u8] in `data` is some permutation of the chunks of Vec<&[u8> in `expected` macro_rules! assert_permutation_of { @@ -45,7 +44,9 @@ fn fs_store_crud() { assert_permutation_of!(result, vec![change1.bytes(), change2.bytes()]); // now compact - store.compact(&doc_id, &[]).unwrap(); + store + .compact(&doc_id, &doc.save(), doc.get_heads()) + .unwrap(); let result = store.get(&doc_id).unwrap().unwrap(); let expected = doc.save(); assert_eq!(result, expected);