Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to automerge 0.5; fix compaction #58

Merged
merged 5 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ axum = ["dep:axum", "dep:tokio", "dep:tokio-util"]
tungstenite = ["dep:tungstenite"]

[dependencies]
automerge = { version = "0.4.1" }
automerge = { version = "0.5.0" }
axum = { version = "0.6.18", features = ["ws"], optional = true }
uuid = { version = "1.2.2"}
crossbeam-channel = { version = "0.5.8" }
Expand Down Expand Up @@ -60,4 +60,4 @@ test-log = { version = "0.2.12", features = ["trace"] }
env_logger = "0.10.0"
tracing-subscriber = { version = "0.3.17", features = ["fmt", "env-filter"] }
itertools = "0.11.0"
autosurgeon = "0.7.1"
autosurgeon = "0.8.0"
96 changes: 71 additions & 25 deletions src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,31 +180,76 @@ impl FsStore {
Ok(())
}

pub fn compact(&self, id: &DocumentId, _full_doc: &[u8]) -> Result<(), Error> {
pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> {
let paths = DocIdPaths::from(id);

// Load all the data we have into a doc
let Some(chunks) = Chunks::load(&self.root, id)? else {
tracing::warn!(doc_id=%id, "attempted to compact non-existent document");
return Ok(());
};
let mut doc = chunks
.to_doc()
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
let chunk = doc.save();
write_chunk(&self.root, &paths, &chunk, output_chunk_name, &self.tmpdir)?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);
std::fs::remove_file(&path).map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
match Chunks::load(&self.root, id) {
Ok(Some(chunks)) => {
let doc = chunks
.to_doc()
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
let chunk = doc.save();
write_chunk(
&self.root,
&paths,
&chunk,
output_chunk_name.clone(),
&self.tmpdir,
)?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
let just_wrote = paths.chunk_path(&self.root, &output_chunk_name);
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);

if path == just_wrote {
tracing::error!(
?path,
"Somehow trying to delete the same path we just wrote to. Not today \
Satan"
);
continue;
}

std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
}
Ok(None) => {
// The chunks are missing in storage for whatever reason
// Try to recreate the document from full_doc if possible
let doc = automerge::Automerge::load(full_doc)
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| {
Error(ErrorKind::CreateLevel2Path(
paths.level2_path(&self.root),
e,
))
})?;

// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
write_chunk(
&self.root,
&paths,
full_doc,
output_chunk_name,
&self.tmpdir,
)?;
}
Err(e) => {
tracing::error!(doc_id=%id, %e, "error loading chunks");
}
}
Ok(())
}
Expand Down Expand Up @@ -233,6 +278,7 @@ fn write_chunk(
// Move the temporary file into a snapshot in the document data directory
// with a name based on the hash of the heads of the document
let output_path = paths.chunk_path(root, &name);
tracing::trace!(?temp_save_path, ?output_path, "renaming chunk file");
std::fs::rename(&temp_save_path, &output_path)
.map_err(|e| Error(ErrorKind::RenameTempFile(temp_save_path, output_path, e)))?;

Expand Down Expand Up @@ -299,13 +345,13 @@ impl DocIdPaths {
}
}

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
enum ChunkType {
Snapshot,
Incremental,
}

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
struct SavedChunkName {
hash: Vec<u8>,
chunk_type: ChunkType,
Expand Down Expand Up @@ -421,7 +467,7 @@ impl Chunks {
// Could be a concurrent process compacting, not an error
tracing::warn!(
missing_chunk_path=%path.display(),
"chunk file disappeared while reading chunks",
"chunk file disappeared while reading chunks; ignoring",
);
continue;
}
Expand Down
84 changes: 59 additions & 25 deletions src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,13 @@ pub(crate) struct DocumentInfo {
peer_connections: HashMap<RepoId, PeerConnection>,
/// Used to resolve futures for DocHandle::changed.
change_observers: Vec<RepoFutureResolver<Result<(), RepoError>>>,
/// Counter of patches since last save,
/// Counter of changes since last compact,
/// used to make decisions about full or incemental saves.
patches_since_last_save: usize,
changes_since_last_compact: usize,
/// The number of changes after which a compaction will be performed.
allowable_changes_until_compaction: usize,
/// Last heads obtained from the automerge doc.
last_heads: Vec<ChangeHash>,
}

/// A state machine representing a connection between a remote repo and a particular document
Expand Down Expand Up @@ -625,13 +629,19 @@ impl DocumentInfo {
document: Arc<RwLock<SharedDocument>>,
handle_count: Arc<AtomicUsize>,
) -> Self {
let last_heads = {
let doc = document.read();
doc.automerge.get_heads()
};
DocumentInfo {
state,
document,
handle_count,
peer_connections: Default::default(),
change_observers: Default::default(),
patches_since_last_save: 0,
changes_since_last_compact: 0,
allowable_changes_until_compaction: 10,
last_heads,
}
}

Expand All @@ -649,7 +659,7 @@ impl DocumentInfo {
| DocState::Error
| DocState::LoadPending { .. }
| DocState::Bootstrap { .. } => {
assert_eq!(self.patches_since_last_save, 0);
assert_eq!(self.changes_since_last_compact, 0);
DocState::PendingRemoval(vec![])
}
DocState::Sync(ref mut storage_fut) => DocState::PendingRemoval(mem::take(storage_fut)),
Expand Down Expand Up @@ -747,8 +757,22 @@ impl DocumentInfo {
/// Count patches since last save,
/// returns whether there were any.
fn note_changes(&mut self) -> bool {
// TODO: count patches somehow.
true
// TODO, Can we do this without a read lock?
// I think that if the changes update last_heads and
// we store `last_heads_since_note` we can get a bool out of this.
let count = {
let doc = self.document.read();
let changes = doc.automerge.get_changes(&self.last_heads);
tracing::trace!(
last_heads=?self.last_heads,
current_heads=?doc.automerge.get_heads(),
"checking for changes since last save"
);
changes.len()
};
let has_patches = count > 0;
self.changes_since_last_compact = self.changes_since_last_compact.saturating_add(count);
has_patches
}

fn resolve_change_observers(&mut self, result: Result<(), RepoError>) {
Expand All @@ -766,19 +790,24 @@ impl DocumentInfo {
if !self.state.should_save() {
return;
}
let should_compact = self.patches_since_last_save > 10;
let storage_fut = if should_compact {
let to_save = {
let mut doc = self.document.write();
doc.automerge.save()
let should_compact =
self.changes_since_last_compact > self.allowable_changes_until_compaction;
let (storage_fut, new_heads) = if should_compact {
let (to_save, new_heads) = {
let doc = self.document.read();
(doc.automerge.save(), doc.automerge.get_heads())
};
storage.compact(document_id.clone(), to_save)
self.changes_since_last_compact = 0;
(storage.compact(document_id.clone(), to_save), new_heads)
} else {
let to_save = {
let mut doc = self.document.write();
doc.automerge.save_incremental()
let (to_save, new_heads) = {
let doc = self.document.read();
(
doc.automerge.save_after(&self.last_heads),
doc.automerge.get_heads(),
)
};
storage.append(document_id.clone(), to_save)
(storage.append(document_id.clone(), to_save), new_heads)
};
match self.state {
DocState::Sync(ref mut futs) => {
Expand All @@ -791,7 +820,7 @@ impl DocumentInfo {
}
let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id));
self.state.poll_pending_save(waker);
self.patches_since_last_save = 0;
self.last_heads = new_heads;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this lead to data loss? We update the heads of the last time we saved before the storage future has resolved, which means the storage future can fail for some reason (or more likely, be stuck in some kind of deadlock) and then then next time we save we only save since the heads which havent made it to disk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about whether it can lead to data loss, but in any case it does make sense to store the heads together with the future onto the DocState, and then note_changes could use the heads of the last (fut, heads) pair? I think it's a good follow-up, because it's a complicated change to make.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #60

}

/// Apply incoming sync messages,
Expand Down Expand Up @@ -1343,15 +1372,16 @@ impl Repo {
// Handle doc changes: sync the document.
let local_repo_id = self.get_repo_id().clone();
if let Some(info) = self.documents.get_mut(&doc_id) {
if !info.note_changes() {
// Stop here if the document wasn't actually changed.
return;
// only run the documents_with_changes workflow if there
// was a change, but always generate potential sync messages
// (below)
if info.note_changes() {
self.documents_with_changes.push(doc_id.clone());
}
let is_first_edit = matches!(info.state, DocState::LocallyCreatedNotEdited);
if is_first_edit {
info.state = DocState::Sync(vec![]);
}
self.documents_with_changes.push(doc_id.clone());
for (to_repo_id, message) in info.generate_sync_messages().into_iter() {
let outgoing = NetworkMessage::Sync {
from_repo_id: local_repo_id.clone(),
Expand Down Expand Up @@ -1445,13 +1475,18 @@ impl Repo {
&self.repo_id,
);
}
RepoEvent::AddChangeObserver(doc_id, change_hash, mut observer) => {
RepoEvent::AddChangeObserver(doc_id, last_heads, mut observer) => {
if let Some(info) = self.documents.get_mut(&doc_id) {
let current_heads = {
let state = info.document.read();
state.automerge.get_heads()
};
if current_heads == change_hash {
tracing::trace!(
?current_heads,
?last_heads,
"handling AddChangeObserver event"
);
if current_heads == last_heads {
info.change_observers.push(observer);
} else {
// Resolve now if the document hash already changed.
Expand Down Expand Up @@ -1555,8 +1590,7 @@ impl Repo {
.expect("Doc should have an info by now.");

let (has_changes, peer_conn_commands) = info.receive_sync_message(per_remote);
if has_changes {
info.note_changes();
if has_changes && info.note_changes() {
self.documents_with_changes.push(document_id.clone());
}

Expand Down
2 changes: 1 addition & 1 deletion test_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tokio = { version = "1.27", features = ["full"] }
tokio-util = "0.7.8"
tokio-serde = {version = "0.8.0", features = ["json"]}
serde_json = "1.0.96"
automerge = { version = "^0.3" }
automerge = { version = "0.5.0" }
uuid = { version = "1.2.2"}
crossbeam-channel = { version = "0.5.8" }
parking_lot = { version = "0.12.1" }
Expand Down
25 changes: 17 additions & 8 deletions tests/network/document_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ async fn test_document_changed_over_sync() {
// Request the document.
let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap();
doc_handle.with_doc_mut(|doc| {
let mut tx = doc.transaction();
tx.put(
automerge::ROOT,
"repo_id",
format!("{}", repo_handle_2.get_repo_id()),
)
.expect("Failed to change the document.");
tx.commit();
let val = doc
.get(automerge::ROOT, "repo_id")
.expect("Failed to read the document.")
.unwrap();
tracing::debug!(heads=?doc.get_heads(), ?val, "before repo_handle_2 makes edit");
{
let mut tx = doc.transaction();
tx.put(
automerge::ROOT,
"repo_id",
format!("{}", repo_handle_2.get_repo_id()),
)
.expect("Failed to change the document.");
tx.commit();
}
tracing::debug!(heads=?doc.get_heads(), "after repo_handle_2 makes edit");
});
});

Expand All @@ -64,6 +72,7 @@ async fn test_document_changed_over_sync() {
.get(automerge::ROOT, "repo_id")
.expect("Failed to read the document.")
.unwrap();
tracing::debug!(?val, "after repo_handle_1 received sync");
assert_eq!(val.0.to_str().unwrap(), "repo1".to_string());
});

Expand Down