Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fsstore compaction #77

Merged
merged 2 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ impl FsStore {
Ok(result.into_iter().collect())
}

#[tracing::instrument(skip(self, changes))]
pub fn append(&self, id: &DocumentId, changes: &[u8]) -> Result<(), Error> {
tracing::debug!("writing incremental change");
let paths = DocIdPaths::from(id);
std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| {
Error(ErrorKind::CreateLevel2Path(
Expand All @@ -179,14 +181,19 @@ impl FsStore {
Ok(())
}

#[tracing::instrument(skip(self, full_doc))]
pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> {
tracing::debug!("compacting document");
let paths = DocIdPaths::from(id);

let mut doc = automerge::Automerge::load(full_doc)
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Load all the data we have into a doc
match Chunks::load(&self.root, id) {
Ok(Some(chunks)) => {
let doc = chunks
.to_doc()
chunks
.add_to_doc(&mut doc)
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Write the snapshot
Expand All @@ -211,11 +218,8 @@ impl FsStore {
let path = paths.chunk_path(&self.root, snapshot);

if path == just_wrote {
tracing::error!(
?path,
"Somehow trying to delete the same path we just wrote to. Not today \
Satan"
);
// This can happen if for some reason `compact` is called when the only thing
// on disk is a snapshot containing the changes we are being asked to compact
continue;
}

Expand All @@ -224,11 +228,6 @@ impl FsStore {
}
}
Ok(None) => {
// The chunks are missing in storage for whatever reason
// Try to recreate the document from full_doc if possible
let doc = automerge::Automerge::load(full_doc)
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

std::fs::create_dir_all(paths.level2_path(&self.root)).map_err(|e| {
Error(ErrorKind::CreateLevel2Path(
paths.level2_path(&self.root),
Expand Down Expand Up @@ -415,7 +414,7 @@ impl Chunks {
fn load(root: &Path, doc_id: &DocumentId) -> Result<Option<Self>, Error> {
let doc_id_hash = DocIdPaths::from(doc_id);
let level2_path = doc_id_hash.level2_path(root);
tracing::debug!(
tracing::trace!(
root=%root.display(),
doc_id=?doc_id,
doc_path=%level2_path.display(),
Expand Down Expand Up @@ -459,7 +458,7 @@ impl Chunks {
tracing::warn!(bad_file=%path.display(), "unexpected non-chunk file in level2 path");
continue;
};
tracing::debug!(chunk_path=%path.display(), "reading chunk file");
tracing::trace!(chunk_path=%path.display(), "reading chunk file");
let contents = match std::fs::read(&path) {
Ok(c) => c,
Err(e) => {
Expand Down Expand Up @@ -491,15 +490,16 @@ impl Chunks {
}))
}

fn to_doc(&self) -> Result<automerge::Automerge, automerge::AutomergeError> {
fn add_to_doc(&self, doc: &mut automerge::Automerge) -> Result<(), automerge::AutomergeError> {
let mut bytes = Vec::new();
for chunk in self.snapshots.values() {
bytes.extend(chunk);
}
for chunk in self.incrementals.values() {
bytes.extend(chunk);
}
automerge::Automerge::load(&bytes)
doc.load_incremental(&bytes)?;
Ok(())
}
}

Expand Down Expand Up @@ -556,3 +556,43 @@ mod error {
DeleteChunk(PathBuf, std::io::Error),
}
}

#[cfg(test)]
mod tests {
use automerge::{transaction::Transactable, AutoCommit, ReadDoc};
use tempfile::tempdir;

use crate::DocumentId;

use super::FsStore;

#[test]
fn compac_adds_new_changes_to_fs() {
let mut doc = AutoCommit::new();
doc.put(automerge::ROOT, "foo", "bar").unwrap();

let data_dir = tempdir().unwrap();

let doc_id = DocumentId::random();
let fs = FsStore::open(&data_dir).unwrap();
let change = doc
.get_changes(&[])
.into_iter()
.flat_map(|c| c.raw_bytes().to_vec())
.collect::<Vec<_>>();

fs.append(&doc_id, &change).unwrap();

doc.put(automerge::ROOT, "foo", "baz").unwrap();
let compacted = doc.save();

fs.compact(&doc_id, &compacted).unwrap();

let reloaded_raw = fs.get(&doc_id).unwrap().unwrap();
let reloaded = AutoCommit::load(&reloaded_raw).unwrap();
assert_eq!(
reloaded.get(&automerge::ROOT, "foo").unwrap().unwrap().0,
"baz".into()
);
}
}
11 changes: 9 additions & 2 deletions src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ impl DocState {
let pinned = Pin::new(&mut storage_fut);
match pinned.poll(&mut Context::from_waker(&waker)) {
Poll::Ready(Ok(_)) => None,
Poll::Ready(Err(_)) => {
Poll::Ready(Err(e)) => {
tracing::error!(err=?e, "error in save operation");
// TODO: propagate error to doc handle.
// `with_doc_mut` could return a future for this.
None
Expand All @@ -555,7 +556,8 @@ impl DocState {
let pinned = Pin::new(&mut storage_fut);
let res = match pinned.poll(&mut Context::from_waker(&waker)) {
Poll::Ready(Ok(_)) => None,
Poll::Ready(Err(_)) => {
Poll::Ready(Err(e)) => {
tracing::error!(err=?e, "error in storage operation");
// TODO: propagate error to doc handle.
// `with_doc_mut` could return a future for this.
None
Expand Down Expand Up @@ -880,6 +882,9 @@ impl DocumentInfo {
changes.len()
};
let has_patches = count > 0;
if has_patches {
tracing::debug!("doc has changed");
}
self.changes_since_last_compact = self.changes_since_last_compact.saturating_add(count);
has_patches
}
Expand All @@ -902,13 +907,15 @@ impl DocumentInfo {
let should_compact =
self.changes_since_last_compact > self.allowable_changes_until_compaction;
let (storage_fut, new_heads) = if should_compact {
tracing::trace!(%document_id, "compacting document");
let (to_save, new_heads) = {
let doc = self.document.read();
(doc.automerge.save(), doc.automerge.get_heads())
};
self.changes_since_last_compact = 0;
(storage.compact(document_id.clone(), to_save), new_heads)
} else {
tracing::trace!(%document_id, "writing incremental chunk");
let (to_save, new_heads) = {
let doc = self.document.read();
(
Expand Down
Loading