diff --git a/src/fs_store.rs b/src/fs_store.rs index d611f791..5bff5359 100644 --- a/src/fs_store.rs +++ b/src/fs_store.rs @@ -51,7 +51,7 @@ use error::ErrorKind; /// 2. Load the data into an automerge document /// 3. `automerge::Automerge::save` the document to a temporary file /// 4. Rename the temporary file to a file in the data directory named -/// `SHA356(automerge::Automerge::get_heads)`.snapshot` +/// `SHA256(automerge::Automerge::get_heads)`.snapshot` /// 5. Delete all the files we loaded in step 1. /// /// The fact that we name the file after the heads of the document means that @@ -86,7 +86,7 @@ impl FsStore { pub fn get(&self, id: &DocumentId) -> Result>, Error> { let chunks = Chunks::load(&self.root, id)?; let Some(chunks) = chunks else { - return Ok(None) + return Ok(None); }; let mut result = Vec::new(); result.extend(chunks.snapshots.into_values().flatten()); @@ -106,7 +106,7 @@ impl FsStore { .map_err(|e| Error(ErrorKind::ErrReadingLevel1Path(entry.path(), e)))? .is_file() { - tracing::warn!( + tracing::trace!( non_dir_path=%entry.path().display(), "unexpected non-directory at level1 of database" ); @@ -122,15 +122,15 @@ impl FsStore { let metadata = entry .metadata() .map_err(|e| Error(ErrorKind::ErrReadingLevel2Path(entry.path(), e)))?; - if metadata.is_dir() { - tracing::warn!( + if !metadata.is_dir() { + tracing::trace!( non_file_path=%entry.path().display(), - "unexpected directory at level2 of database" + "unexpected non-directory at level2 of database" ); continue; } - let Some(doc_paths) = DocIdPaths::parse(&level1, entry.path()) else { - tracing::warn!( + let Some(doc_paths) = DocIdPaths::parse(entry.path()) else { + tracing::trace!( non_doc_path=%entry.path().display(), "unexpected non-document path at level2 of database" ); @@ -144,12 +144,6 @@ impl FsStore { pub fn append(&self, id: &DocumentId, changes: &[u8]) -> Result<(), Error> { let paths = DocIdPaths::from(id); - std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| { - Error(ErrorKind::CreateLevel2Path( - paths.level2_path(&self.root), - e, - )) - })?; let chunk_name = SavedChunkName::new_incremental(changes); write_chunk(&self.root, &paths, changes, chunk_name)?; @@ -157,31 +151,45 @@ impl FsStore { Ok(()) } - pub fn compact(&self, id: &DocumentId, _full_doc: &[u8]) -> Result<(), Error> { + pub fn compact( + &self, + id: &DocumentId, + full_doc: &[u8], + new_heads: Vec, + ) -> Result<(), Error> { let paths = DocIdPaths::from(id); // Load all the data we have into a doc - let Some(chunks) = Chunks::load(&self.root, id)? else { - tracing::warn!(doc_id=%id, "attempted to compact non-existent document"); - return Ok(()) - }; - let mut doc = chunks - .to_doc() - .map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?; - - // Write the snapshot - let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads()); - let chunk = doc.save(); - write_chunk(&self.root, &paths, &chunk, output_chunk_name)?; - - // Remove all the old data - for incremental in chunks.incrementals.keys() { - let path = paths.chunk_path(&self.root, incremental); - std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; - } - for snapshot in chunks.snapshots.keys() { - let path = paths.chunk_path(&self.root, snapshot); - std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + match Chunks::load(&self.root, id) { + Ok(Some(chunks)) => { + // Write the snapshot + let output_chunk_name = SavedChunkName::new_snapshot(new_heads); + write_chunk(&self.root, &paths, full_doc, output_chunk_name.clone())?; + + // Remove all the old data + for incremental in chunks.incrementals.keys() { + let path = paths.chunk_path(&self.root, incremental); + std::fs::remove_file(&path) + .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + } + for snapshot in chunks.snapshots.keys() { + let path = paths.chunk_path(&self.root, snapshot); + + std::fs::remove_file(&path) + .map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?; + } + } + Ok(None) => { + let output_chunk_name = SavedChunkName { + hash: uuid::Uuid::new_v4().as_bytes().to_vec(), + chunk_type: ChunkType::Snapshot, + }; + // Write the snapshot + write_chunk(&self.root, &paths, full_doc, output_chunk_name)?; + } + Err(e) => { + tracing::error!(e=%e, "Error loading chunks"); + } } Ok(()) } @@ -195,7 +203,7 @@ fn write_chunk( ) -> Result<(), Error> { // Write to a temp file and then rename to avoid partial writes let mut temp_save = - tempfile::NamedTempFile::new().map_err(|e| Error(ErrorKind::CreateTempFile(e)))?; + tempfile::NamedTempFile::new_in(root).map_err(|e| Error(ErrorKind::CreateTempFile(e)))?; let temp_save_path = temp_save.path().to_owned(); temp_save .as_file_mut() @@ -206,9 +214,16 @@ fn write_chunk( .sync_all() .map_err(|e| Error(ErrorKind::WriteTempFile(temp_save_path.clone(), e)))?; + std::fs::create_dir_all(paths.level2_path(root)) + .map_err(|e| Error(ErrorKind::CreateLevel2Path(paths.level2_path(root), e)))?; + // Move the temporary file into a snapshot in the document data directory // with a name based on the hash of the heads of the document let output_path = paths.chunk_path(root, &name); + + tracing::trace!("Renaming: {:?}", temp_save); + tracing::trace!("To: {:?}", output_path); + std::fs::rename(&temp_save_path, &output_path) .map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?; @@ -236,16 +251,18 @@ impl<'a> From<&'a DocumentId> for DocIdPaths { } impl DocIdPaths { - fn parse, P2: AsRef>(level1: P1, level2: P2) -> Option { - let level1 = level1.as_ref().to_str()?; + fn parse(level2: PathBuf) -> Option { + let level1 = level2.parent()?.file_name()?.to_str()?; + let level2 = level2.file_name()?.to_str()?; + let prefix = hex::decode(level1).ok()?; let prefix = <[u8; 2]>::try_from(prefix).ok()?; - let level2 = level2.as_ref().to_str()?; let doc_id_bytes = hex::decode(level2).ok()?; let doc_id_str = String::from_utf8(doc_id_bytes).ok()?; let doc_id = DocumentId::from(doc_id_str.as_str()); let result = Self::from(&doc_id); + if result.prefix != prefix { None } else { @@ -274,13 +291,13 @@ impl DocIdPaths { } } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] enum ChunkType { Snapshot, Incremental, } -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] struct SavedChunkName { hash: Vec, chunk_type: ChunkType, @@ -343,7 +360,7 @@ impl Chunks { fn load(root: &Path, doc_id: &DocumentId) -> Result, Error> { let doc_id_hash = DocIdPaths::from(doc_id); let level2_path = doc_id_hash.level2_path(root); - tracing::debug!( + tracing::trace!( root=%root.display(), doc_id=?doc_id, doc_path=%level2_path.display(), @@ -379,11 +396,12 @@ impl Chunks { .map_err(|e| Error(ErrorKind::ErrReadingChunkFileMetadata(path.clone(), e)))? .is_file() { - tracing::warn!(bad_file=%path.display(), "unexpected non-file in level2 path"); + tracing::trace!(bad_file=%path.display(), "unexpected non-file in level2 path"); continue; } - let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse) else { - tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path"); + let Some(chunk_name) = entry.file_name().to_str().and_then(SavedChunkName::parse) + else { + tracing::trace!(bad_file=%path.display(), "unexpected non-chunk file in level2 path"); continue; }; tracing::debug!(chunk_path=%path.display(), "reading chunk file"); @@ -393,7 +411,7 @@ impl Chunks { match e.kind() { std::io::ErrorKind::NotFound => { // Could be a concurrent process compacting, not an error - tracing::warn!( + tracing::trace!( missing_chunk_path=%path.display(), "chunk file disappeared while reading chunks", ); @@ -417,17 +435,6 @@ impl Chunks { incrementals, })) } - - fn to_doc(&self) -> Result { - let mut bytes = Vec::new(); - for chunk in self.snapshots.values() { - bytes.extend(chunk); - } - for chunk in self.incrementals.values() { - bytes.extend(chunk); - } - automerge::Automerge::load(&bytes) - } } mod error { @@ -471,8 +478,6 @@ mod error { ErrReadingChunkFile(PathBuf, std::io::Error), #[error("error creating level 2 path {0}: {1}")] CreateLevel2Path(PathBuf, std::io::Error), - #[error("error loading doc to compact: {0}")] - LoadDocToCompact(automerge::AutomergeError), #[error("error creating temp file: {0}")] CreateTempFile(std::io::Error), #[error("error writing temp file {0}: {1}")] diff --git a/src/interfaces.rs b/src/interfaces.rs index 8fc7b9a2..3c72d1a9 100644 --- a/src/interfaces.rs +++ b/src/interfaces.rs @@ -1,3 +1,4 @@ +use automerge::ChangeHash; use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -113,5 +114,6 @@ pub trait Storage: Send { &self, _id: DocumentId, _full_doc: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>>; } diff --git a/src/network_connect.rs b/src/network_connect.rs index 4bf15be6..61006ea8 100644 --- a/src/network_connect.rs +++ b/src/network_connect.rs @@ -35,7 +35,7 @@ impl RepoHandle { Ok(repo_msg) } Ok(m) => { - tracing::warn!(?m, repo_id=?repo_id, "Received non-repo message"); + tracing::trace!(?m, repo_id=?repo_id, "Received non-repo message"); Err(NetworkError::Error) } Err(e) => { diff --git a/src/repo.rs b/src/repo.rs index 5ebf5a53..52fe588f 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -555,9 +555,11 @@ pub(crate) struct DocumentInfo { sync_states: HashMap, /// Used to resolve futures for DocHandle::changed. change_observers: Vec>>, - /// Counter of patches since last save, + /// Counter of local saves since last compact, /// used to make decisions about full or incemental saves. - patches_since_last_save: usize, + patches_since_last_compact: usize, + /// + allowable_changes_until_compaction: usize, /// Last heads obtained from the automerge doc. last_heads: Vec, } @@ -578,7 +580,8 @@ impl DocumentInfo { handle_count, sync_states: Default::default(), change_observers: Default::default(), - patches_since_last_save: 0, + patches_since_last_compact: 0, + allowable_changes_until_compaction: 10, last_heads, } } @@ -597,7 +600,7 @@ impl DocumentInfo { | DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => { - assert_eq!(self.patches_since_last_save, 0); + assert_eq!(self.patches_since_last_compact, 0); DocState::PendingRemoval(vec![]) } DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)), @@ -695,13 +698,25 @@ impl DocumentInfo { /// Count patches since last save, /// returns whether there were any. fn note_changes(&mut self) -> bool { + // TODO, Can we do this without a read lock? + // I think that if the changes update last_heads and + // we store `last_heads_since_note` we can get a bool out of this. let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); + tracing::trace!( + "last: {:?}, current: {:?}", + self.last_heads, + doc.automerge.get_heads() + ); + //self.last_heads = doc.automerge.get_heads(); changes.len() }; let has_patches = count > 0; - self.patches_since_last_save = self.patches_since_last_save.checked_add(count).unwrap_or(0); + self.patches_since_last_compact = self + .patches_since_last_compact + .checked_add(count) + .unwrap_or(0); has_patches } @@ -720,13 +735,18 @@ impl DocumentInfo { if !self.state.should_save() { return; } - let should_compact = self.patches_since_last_save > 10; + let should_compact = + self.patches_since_last_compact > self.allowable_changes_until_compaction; let (storage_fut, new_heads) = if should_compact { let (to_save, new_heads) = { let doc = self.document.read(); (doc.automerge.save(), doc.automerge.get_heads()) }; - (storage.compact(document_id.clone(), to_save), new_heads) + self.patches_since_last_compact = 0; + ( + storage.compact(document_id.clone(), to_save, new_heads.clone()), + new_heads, + ) } else { let (to_save, new_heads) = { let doc = self.document.read(); @@ -735,6 +755,7 @@ impl DocumentInfo { doc.automerge.get_heads(), ) }; + self.patches_since_last_compact.checked_add(1).unwrap_or(0); (storage.append(document_id.clone(), to_save), new_heads) }; match self.state { @@ -748,7 +769,6 @@ impl DocumentInfo { } let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id)); self.state.poll_pending_save(waker); - self.patches_since_last_save = 0; self.last_heads = new_heads; } @@ -1213,15 +1233,13 @@ impl Repo { // Handle doc changes: sync the document. let local_repo_id = self.get_repo_id().clone(); if let Some(info) = self.documents.get_mut(&doc_id) { - if !info.note_changes() { - // Stop here if the document wasn't actually changed. - return; + if info.note_changes() { + self.documents_with_changes.push(doc_id.clone()); } let is_first_edit = matches!(info.state, DocState::LocallyCreatedNotEdited); if is_first_edit { info.state = DocState::Sync(vec![]); } - self.documents_with_changes.push(doc_id.clone()); for (to_repo_id, message) in info.generate_sync_messages().into_iter() { let outgoing = NetworkMessage::Sync { from_repo_id: local_repo_id.clone(), @@ -1329,6 +1347,7 @@ impl Repo { let state = info.document.read(); state.automerge.get_heads() }; + tracing::trace!("Change observer: {:?} {:?}", current_heads, change_hash); if current_heads == change_hash { info.change_observers.push(observer); } else { @@ -1454,8 +1473,9 @@ impl Repo { .expect("Doc should have an info by now."); if info.receive_sync_message(per_remote) { - info.note_changes(); - self.documents_with_changes.push(document_id.clone()); + if info.note_changes() { + self.documents_with_changes.push(document_id.clone()); + } } // Note: since receiving and generating sync messages is done diff --git a/src/tokio/fs_storage.rs b/src/tokio/fs_storage.rs index 38afd348..c5e20288 100644 --- a/src/tokio/fs_storage.rs +++ b/src/tokio/fs_storage.rs @@ -78,11 +78,17 @@ impl Storage for FsStorage { &self, id: crate::DocumentId, full_doc: Vec, + new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { let inner = Arc::clone(&self.inner); let inner_id = id.clone(); self.handle - .spawn_blocking(move || inner.lock().unwrap().compact(&inner_id, &full_doc)) + .spawn_blocking(move || { + inner + .lock() + .unwrap() + .compact(&inner_id, &full_doc, new_heads) + }) .map(handle_joinerror) .map_err(move |e| { tracing::error!(err=?e, doc=?id, "error compacting chunk to filesystem"); diff --git a/test_utils/Cargo.toml b/test_utils/Cargo.toml index 4065309a..319cdb51 100644 --- a/test_utils/Cargo.toml +++ b/test_utils/Cargo.toml @@ -12,7 +12,7 @@ tokio = { version = "1.27", features = ["full"] } tokio-util = "0.7.8" tokio-serde = {version = "0.8.0", features = ["json"]} serde_json = "1.0.96" -automerge = { version = "^0.3" } +automerge = { version = "0.5.0" } uuid = { version = "1.2.2"} crossbeam-channel = { version = "0.5.8" } parking_lot = { version = "0.12.1" } diff --git a/test_utils/src/storage_utils.rs b/test_utils/src/storage_utils.rs index fb4d1bb3..c43707c2 100644 --- a/test_utils/src/storage_utils.rs +++ b/test_utils/src/storage_utils.rs @@ -1,3 +1,4 @@ +use automerge::ChangeHash; use automerge_repo::{DocumentId, Storage, StorageError}; use futures::future::{BoxFuture, TryFutureExt}; use futures::FutureExt; @@ -30,6 +31,7 @@ impl Storage for SimpleStorage { &self, _id: DocumentId, _chunk: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { futures::future::ready(Ok(())).boxed() } @@ -76,6 +78,7 @@ impl Storage for InMemoryStorage { &self, id: DocumentId, full_doc: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { let mut documents = self.documents.lock(); documents.insert(id, full_doc); @@ -136,8 +139,8 @@ impl AsyncInMemoryStorage { StorageRequest::Compact(doc_id, data, sender) => { let _entry = documents .entry(doc_id) - .and_modify(|entry| *entry = data) - .or_insert_with(Default::default); + .and_modify(|entry| *entry = data.clone()) + .or_insert_with(|| data); let (tx, rx) = oneshot(); results.push_back(tx); tokio::spawn(async move { @@ -202,6 +205,7 @@ impl Storage for AsyncInMemoryStorage { &self, id: DocumentId, full_doc: Vec, + _new_heads: Vec, ) -> BoxFuture<'static, Result<(), StorageError>> { let (tx, rx) = oneshot(); self.chan diff --git a/tests/document_changed.rs b/tests/document_changed.rs index 6fb50c9d..eae1f14a 100644 --- a/tests/document_changed.rs +++ b/tests/document_changed.rs @@ -41,6 +41,18 @@ fn test_document_changed_over_sync() { peers.insert(repo_handle_2.get_repo_id().clone(), network_1); peers.insert(repo_handle_1.get_repo_id().clone(), network_2); + // Edit the document. + document_handle_1.with_doc_mut(|doc| { + let mut tx = doc.transaction(); + tx.put( + automerge::ROOT, + "repo_id", + format!("{}", repo_handle_1.get_repo_id().clone()), + ) + .expect("Failed to change the document."); + tx.commit(); + }); + let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -53,42 +65,44 @@ fn test_document_changed_over_sync() { // Request the document. let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap(); doc_handle.with_doc_mut(|doc| { - let mut tx = doc.transaction(); - tx.put( - automerge::ROOT, - "repo_id", - format!("{}", repo_handle_2.get_repo_id()), - ) - .expect("Failed to change the document."); - tx.commit(); + println!("Heads when 2 makes edit: {:?}", doc.get_heads()); + let id = doc + .get(automerge::ROOT, "repo_id") + .expect("Failed to read the document.") + .unwrap(); + println!("Id when two makes edit: {:?}", id); + { + let mut tx = doc.transaction(); + tx.put( + automerge::ROOT, + "repo_id", + format!("{}", repo_handle_2.get_repo_id()), + ) + .expect("Failed to change the document."); + tx.commit(); + } + println!("Heads after 2 makes edit: {:?}", doc.get_heads()); }); }); // Spawn a task that awaits the document change. let (done_sync_sender, mut done_sync_receiver) = channel(1); - let repo_id = repo_handle_1.get_repo_id().clone(); rt.spawn(async move { - // Edit the document. - document_handle_1.with_doc_mut(|doc| { - let mut tx = doc.transaction(); - tx.put(automerge::ROOT, "repo_id", format!("{}", repo_id)) - .expect("Failed to change the document."); - tx.commit(); - }); loop { // Await changes until the edit comes through over sync. - document_handle_1.changed().await.unwrap(); let equals = document_handle_1.with_doc(|doc| { let val = doc .get(automerge::ROOT, "repo_id") .expect("Failed to read the document.") .unwrap(); + println!("Val: {:?}", val); val.0.to_str().unwrap() == format!("{}", expected_repo_id) }); if equals { done_sync_sender.send(()).await.unwrap(); break; } + document_handle_1.changed().await.unwrap(); } }); diff --git a/tests/fs_storage/main.rs b/tests/fs_storage/main.rs index 82613918..e39ad4b3 100644 --- a/tests/fs_storage/main.rs +++ b/tests/fs_storage/main.rs @@ -44,7 +44,9 @@ fn fs_store_crud() { assert_permutation_of!(result, vec![change1.bytes(), change2.bytes()]); // now compact - store.compact(&doc_id, &[]).unwrap(); + store + .compact(&doc_id, &doc.save(), doc.get_heads()) + .unwrap(); let result = store.get(&doc_id).unwrap().unwrap(); let expected = doc.save(); assert_eq!(result, expected);