Skip to content

Commit

Permalink
Load documents when receiving new network events
Browse files Browse the repository at this point in the history
Problem: when a peer responds to sync messages for a document which it
has not currently loaded it doesn't attempt to load the document from
disk. This means that attempting to request a document whilst
simultanously syncing it with another machine could result in the
dochandle resolving to an empty document.

Solution: ensure that the document is loaded if not already loaded when
the sync message is received.
  • Loading branch information
alexjg committed Feb 24, 2025
1 parent 6564d5e commit e8c4ba7
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,18 @@ impl Repo {
};
let document = Arc::new(RwLock::new(shared_document));
let handle_count = Arc::new(AtomicUsize::new(0));
DocumentInfo::new(state, document, handle_count)
let mut info = DocumentInfo::new(state, document, handle_count);

let storage_fut = self.storage.get(document_id.clone());
info.state.add_boostrap_storage_fut(storage_fut);
info.poll_storage_operation(
document_id.clone(),
&self.wake_sender,
&self.repo_sender,
&self.repo_id,
);

info
});

if !info.state.should_sync() {
Expand Down
6 changes: 6 additions & 0 deletions test_utils/src/storage_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ impl InMemoryStorage {
pub fn contains_document(&self, doc_id: DocumentId) -> bool {
self.documents.lock().contains_key(&doc_id)
}

pub fn fork(&self) -> Self {
Self {
documents: Arc::new(Mutex::new(self.documents.lock().clone())),
}
}
}

impl Storage for InMemoryStorage {
Expand Down
65 changes: 65 additions & 0 deletions tests/network/document_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,71 @@ async fn request_doc_which_is_not_shared_does_not_announce() {
assert!(doc_handle.is_none());
}

#[test(tokio::test)]
async fn request_doc_which_is_in_storage_loads_from_storage() {
// First create a storage which contains some changes for a document
let storage = InMemoryStorage::default();
let repo_1 = Repo::new(Some("repo1".to_string()), Box::new(storage.clone()));

let repo_handle_1 = repo_1.run();
// Create a document for repo_1
let doc_handle_on_1 = repo_handle_1.new_document();
doc_handle_on_1
.with_doc_mut(|d| {
d.transact::<_, _, automerge::AutomergeError>(|tx| {
tx.put(automerge::ROOT, "foo", "bar")?;
Ok(())
})
})
.unwrap();
let document_id = doc_handle_on_1.document_id();

// Wait a minute for storage to catch up
tokio::time::sleep(Duration::from_millis(100)).await;

// Now, fork the storage for use later
let storage_without_latest = storage.fork();

// Make some more changes on repo1
doc_handle_on_1
.with_doc_mut(|d| {
d.transact::<_, _, automerge::AutomergeError>(|tx| {
tx.put(automerge::ROOT, "baz", "qux")?;
Ok(())
})
})
.unwrap();

let heads_on_repo_1 = doc_handle_on_1.with_doc(|d| d.get_heads());

// Stop repo1
repo_handle_1.stop().unwrap();

// Now start two more repos, one which has the latest changes and one which has the out of date changes

let repo_2 = Repo::new(Some("repo2".to_string()), Box::new(storage.clone()));
let repo_3 = Repo::new(Some("repo3".to_string()), Box::new(storage_without_latest));

let repo_handle_2 = repo_2.run();
let repo_handle_3 = repo_3.run();

connect_repos(&repo_handle_2, &repo_handle_3);

// Open the document on repo3 so it announces
repo_handle_3
.request_document(document_id.clone())
.await
.unwrap();

// wait a bit
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Request the document
let doc_handle = repo_handle_2.request_document(document_id).await.unwrap();

assert_eq!(doc_handle.with_doc(|d| d.get_heads()), heads_on_repo_1);
}

fn create_doc_with_contents(handle: &RepoHandle, key: &str, value: &str) -> DocumentId {
let document_handle = handle.new_document();
document_handle.with_doc_mut(|doc| {
Expand Down

0 comments on commit e8c4ba7

Please sign in to comment.