Skip to content

Commit 5e8b3c3

Browse files
committed
merge(durability): crash-safe writes + verify-on-read self-heal (audit f2)
fsync-before-rename + shard-dir fsync; read() re-hashes vs the content address and wires the previously-dead HashMismatch; on mismatch the bad object is quarantined to .corrupt/ so a correct rewrite lands instead of being dedup-skipped. Brings the canonical blob store to kin-vector's crash-safety bar. Independently re-ran: 46/46.
2 parents f2096c6 + 5db36ce commit 5e8b3c3

1 file changed

Lines changed: 280 additions & 9 deletions

File tree

src/lib.rs

Lines changed: 280 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ mod error;
66
pub use error::BlobError;
77

88
use sha2::{Digest, Sha256};
9-
use std::fs;
9+
use std::fs::{self, File};
10+
use std::io::Write;
1011
use std::path::{Path, PathBuf};
1112
use std::sync::atomic::{AtomicU64, Ordering};
12-
use tracing::debug;
13+
use tracing::{debug, warn};
1314

1415
/// Content-addressed 256-bit hash.
1516
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
@@ -91,8 +92,13 @@ impl BlobStore {
9192
/// Write data to the blob store, returning its content hash.
9293
///
9394
/// If a blob with the same hash already exists, this is a no-op (content
94-
/// deduplication). Writes are atomic: data is written to a temporary file
95-
/// in the shard directory, then renamed into place.
95+
/// deduplication). Writes are atomic AND durable: data is written to a
96+
/// unique temporary file in the shard directory, the file's bytes are
97+
/// `fsync`ed, it is renamed into place, and the directory entry is `fsync`ed
98+
/// so the rename itself survives a crash. Without the fsyncs a power loss
99+
/// after `rename` could leave a zero-length or torn object that the content
100+
/// address now vouches for — and because [`write`](Self::write) dedup-skips
101+
/// on existence, that corrupt object would be trusted forever.
96102
pub fn write(&self, data: &[u8]) -> Result<Hash256> {
97103
let _span = tracing::info_span!("kin_blobs.write", bytes = data.len()).entered();
98104
let hash = digest(data);
@@ -108,21 +114,66 @@ impl BlobStore {
108114
let shard_dir = blob_path.parent().expect("blob path always has a parent");
109115
fs::create_dir_all(shard_dir).map_err(|e| BlobError::io(shard_dir, e))?;
110116

111-
// Atomic write: write to a temp file in the shard dir, then rename.
117+
// Atomic-durable write: write to a unique temp file in the shard dir,
118+
// fsync its contents, rename into place, then fsync the directory.
112119
let seq = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
113120
let temp_path = shard_dir.join(format!(".tmp-{}-{}-{}", hash, std::process::id(), seq));
114-
fs::write(&temp_path, data).map_err(|e| BlobError::io(&temp_path, e))?;
115-
fs::rename(&temp_path, &blob_path).map_err(|e| BlobError::io(&blob_path, e))?;
121+
write_file_durably(&temp_path, data).map_err(|e| BlobError::io(&temp_path, e))?;
122+
fs::rename(&temp_path, &blob_path).map_err(|e| {
123+
// Don't leave the fsynced temp file behind on a failed rename.
124+
let _ = fs::remove_file(&temp_path);
125+
BlobError::io(&blob_path, e)
126+
})?;
127+
sync_dir(shard_dir);
116128

117129
debug!(hash = %hash, bytes = data.len(), "wrote blob");
118130
Ok(hash)
119131
}
120132

121-
/// Read a blob by its hash.
133+
/// Read a blob by its hash, verifying that the stored bytes still hash to
134+
/// the requested address.
122135
///
123-
/// Returns an error if the blob does not exist.
136+
/// This is the honest default for a content-addressed store: a torn write or
137+
/// on-disk bit-rot is caught here instead of silently serving corrupt content
138+
/// into the graph. On a mismatch the bad object is quarantined (see
139+
/// [`quarantine`](Self::quarantine)) — moved aside so dedup stops trusting it
140+
/// and a later `write` of the correct bytes can heal the store — and a
141+
/// [`BlobError::HashMismatch`] is returned.
142+
///
143+
/// Returns [`BlobError::NotFound`] if the blob does not exist. Callers on a
144+
/// hot path who have already established integrity may opt out of the re-hash
145+
/// with [`read_unverified`](Self::read_unverified).
124146
pub fn read(&self, hash: &Hash256) -> Result<Vec<u8>> {
125147
let _span = tracing::info_span!("kin_blobs.read", hash = %hash).entered();
148+
let data = self.read_unverified(hash)?;
149+
let actual = digest(&data);
150+
if actual != *hash {
151+
// Content no longer matches its address: the object is corrupt.
152+
// Quarantine it (preserving evidence) so dedup stops trusting it and
153+
// a rewrite can heal the store, then surface the mismatch.
154+
let quarantined = self.quarantine(hash).unwrap_or(None);
155+
warn!(
156+
expected = %hash,
157+
actual = %actual,
158+
quarantined = ?quarantined,
159+
"blob failed hash verification on read; quarantined corrupt object"
160+
);
161+
return Err(BlobError::HashMismatch {
162+
expected: hash.to_string(),
163+
actual: actual.to_string(),
164+
});
165+
}
166+
Ok(data)
167+
}
168+
169+
/// Read a blob by its hash WITHOUT verifying its content hash.
170+
///
171+
/// Faster than [`read`](Self::read) because it skips the re-hash, but it
172+
/// trusts the on-disk bytes blindly. Use only on hot paths where integrity
173+
/// has already been established — never as the general-purpose read.
174+
///
175+
/// Returns [`BlobError::NotFound`] if the blob does not exist.
176+
pub fn read_unverified(&self, hash: &Hash256) -> Result<Vec<u8>> {
126177
let blob_path = self.blob_path(hash);
127178
fs::read(&blob_path).map_err(|e| {
128179
if e.kind() == std::io::ErrorKind::NotFound {
@@ -162,6 +213,44 @@ impl BlobStore {
162213
Ok(())
163214
}
164215

216+
/// Move a corrupt object aside into a `.corrupt/` area under the store root.
217+
///
218+
/// Preserves the bad bytes as evidence (quarantine never deletes data) and,
219+
/// crucially, frees the object's content-addressed path so a subsequent
220+
/// [`write`](Self::write) of the correct content is no longer dedup-skipped —
221+
/// the store self-heals on the next write. This is invoked automatically by
222+
/// [`read`](Self::read) on a [`BlobError::HashMismatch`].
223+
///
224+
/// Returns the path the object was moved to, or `Ok(None)` when there was
225+
/// nothing at the object's path to quarantine (e.g. a concurrent reader
226+
/// already moved it). The quarantine file is named after the object's
227+
/// *expected* address so an operator can trace which blob it was meant to be.
228+
pub fn quarantine(&self, hash: &Hash256) -> Result<Option<PathBuf>> {
229+
let blob_path = self.blob_path(hash);
230+
if !blob_path.exists() {
231+
return Ok(None);
232+
}
233+
let corrupt_dir = self.root.join(".corrupt");
234+
fs::create_dir_all(&corrupt_dir).map_err(|e| BlobError::io(&corrupt_dir, e))?;
235+
let seq = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
236+
let dest = corrupt_dir.join(format!("{}-{}-{}", hash, std::process::id(), seq));
237+
match fs::rename(&blob_path, &dest) {
238+
Ok(()) => {
239+
// Make the quarantine move durable so a crash can't resurrect the
240+
// corrupt object back into the dedup path.
241+
sync_dir(&corrupt_dir);
242+
if let Some(parent) = blob_path.parent() {
243+
sync_dir(parent);
244+
}
245+
debug!(hash = %hash, dest = %dest.display(), "quarantined corrupt blob");
246+
Ok(Some(dest))
247+
}
248+
// Lost a race with another reader that already quarantined it.
249+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
250+
Err(e) => Err(BlobError::io(&dest, e)),
251+
}
252+
}
253+
165254
/// Return the root directory of the blob store.
166255
pub fn root(&self) -> &Path {
167256
&self.root
@@ -176,6 +265,26 @@ impl BlobStore {
176265
}
177266
}
178267

268+
/// Write `data` to `path` and `fsync` the file so its bytes are durable on disk
269+
/// before the caller renames it into its final content-addressed location.
270+
fn write_file_durably(path: &Path, data: &[u8]) -> std::io::Result<()> {
271+
let mut file = File::create(path)?;
272+
file.write_all(data)?;
273+
file.sync_all()?;
274+
Ok(())
275+
}
276+
277+
/// Best-effort `fsync` of a directory so a `rename`/`create` within it is
278+
/// durable. Directory fsync is not guaranteed on every platform (and is a no-op
279+
/// on some); failures are intentionally swallowed because the object file itself
280+
/// was already fsynced — the worst case is a crash window where the rename is
281+
/// not yet durable, recoverable by re-writing the (idempotent) blob.
282+
fn sync_dir(dir: &Path) {
283+
if let Ok(handle) = File::open(dir) {
284+
let _ = handle.sync_all();
285+
}
286+
}
287+
179288
#[cfg(test)]
180289
mod tests {
181290
use super::*;
@@ -589,4 +698,166 @@ mod tests {
589698
let retrieved = store.read(&hash).unwrap();
590699
assert_eq!(retrieved, data);
591700
}
701+
702+
// -----------------------------------------------------------------------
703+
// Crash / corruption durability tests
704+
// -----------------------------------------------------------------------
705+
706+
/// Reconstruct a blob's on-disk path the same way `blob_path` does, so tests
707+
/// can corrupt the backing file directly to simulate a torn write / bit-rot.
708+
fn on_disk_path(store: &BlobStore, hash: &Hash256) -> PathBuf {
709+
let hex = hash.to_string();
710+
store.root().join(&hex[..2]).join(&hex[2..])
711+
}
712+
713+
fn corrupt_dir_entries(store: &BlobStore) -> Vec<PathBuf> {
714+
let corrupt = store.root().join(".corrupt");
715+
if !corrupt.exists() {
716+
return Vec::new();
717+
}
718+
std::fs::read_dir(&corrupt)
719+
.unwrap()
720+
.map(|e| e.unwrap().path())
721+
.collect()
722+
}
723+
724+
/// A write must leave no temporary file behind: the temp is fsynced then
725+
/// renamed atomically into place, so the shard dir holds only the final blob.
726+
#[test]
727+
fn write_leaves_no_temp_file() {
728+
let (_dir, store) = make_store();
729+
let hash = store.write(b"atomic durable write").unwrap();
730+
let shard_dir = on_disk_path(&store, &hash).parent().unwrap().to_path_buf();
731+
let leftovers: Vec<_> = std::fs::read_dir(&shard_dir)
732+
.unwrap()
733+
.map(|e| e.unwrap().file_name().to_string_lossy().into_owned())
734+
.filter(|name| name.starts_with(".tmp-"))
735+
.collect();
736+
assert!(leftovers.is_empty(), "stray temp files remain: {leftovers:?}");
737+
}
738+
739+
/// A torn write (file truncated mid-content) must be caught on read as a
740+
/// hash mismatch rather than silently returning short content.
741+
#[test]
742+
fn torn_write_detected_on_read() {
743+
let (_dir, store) = make_store();
744+
let data: Vec<u8> = (0..2048).map(|i| (i % 256) as u8).collect();
745+
let hash = store.write(&data).unwrap();
746+
747+
// Simulate a torn write: truncate the backing file to half its length.
748+
let path = on_disk_path(&store, &hash);
749+
let partial = &data[..data.len() / 2];
750+
std::fs::write(&path, partial).unwrap();
751+
752+
let err = store.read(&hash).unwrap_err();
753+
assert!(
754+
matches!(err, BlobError::HashMismatch { .. }),
755+
"expected HashMismatch, got {err:?}"
756+
);
757+
}
758+
759+
/// A single flipped byte (silent bit-rot) must be detected on read.
760+
#[test]
761+
fn bit_flip_detected_on_read() {
762+
let (_dir, store) = make_store();
763+
let data = b"the quick brown fox jumps over the lazy dog".to_vec();
764+
let hash = store.write(&data).unwrap();
765+
766+
let path = on_disk_path(&store, &hash);
767+
let mut bytes = std::fs::read(&path).unwrap();
768+
bytes[0] ^= 0xFF; // flip the first byte
769+
std::fs::write(&path, &bytes).unwrap();
770+
771+
let err = store.read(&hash).unwrap_err();
772+
assert!(
773+
matches!(err, BlobError::HashMismatch { .. }),
774+
"expected HashMismatch, got {err:?}"
775+
);
776+
}
777+
778+
/// Detecting corruption on read must quarantine the bad object (preserving
779+
/// its exact bytes as evidence) and free its content-addressed path.
780+
#[test]
781+
fn read_quarantines_corrupt_object() {
782+
let (_dir, store) = make_store();
783+
let data = b"quarantine me when corrupt".to_vec();
784+
let hash = store.write(&data).unwrap();
785+
786+
let path = on_disk_path(&store, &hash);
787+
let corrupt_bytes = b"this is not the original content at all".to_vec();
788+
std::fs::write(&path, &corrupt_bytes).unwrap();
789+
790+
let _ = store.read(&hash).unwrap_err();
791+
792+
// The content-addressed path is now free (no longer dedup-trusted)...
793+
assert!(!path.exists(), "corrupt object should be moved out of its path");
794+
// ...and the bad bytes are preserved as evidence under `.corrupt/`.
795+
let entries = corrupt_dir_entries(&store);
796+
assert_eq!(entries.len(), 1, "exactly one quarantined object expected");
797+
let preserved = std::fs::read(&entries[0]).unwrap();
798+
assert_eq!(preserved, corrupt_bytes, "quarantine must preserve evidence");
799+
}
800+
801+
/// After a corrupt object is quarantined on read, re-writing the correct
802+
/// content must land (not be dedup-skipped) and heal the store.
803+
#[test]
804+
fn rewrite_after_quarantine_heals() {
805+
let (_dir, store) = make_store();
806+
let data = b"heal me after corruption".to_vec();
807+
let hash = store.write(&data).unwrap();
808+
809+
// Corrupt the backing file, then read to trigger quarantine.
810+
let path = on_disk_path(&store, &hash);
811+
std::fs::write(&path, b"corrupted").unwrap();
812+
let _ = store.read(&hash).unwrap_err();
813+
assert!(!path.exists());
814+
815+
// Re-writing the original content must NOT be dedup-skipped, and the
816+
// store is healed: the read now succeeds and verifies.
817+
let healed_hash = store.write(&data).unwrap();
818+
assert_eq!(healed_hash, hash);
819+
let retrieved = store.read(&hash).unwrap();
820+
assert_eq!(retrieved, data);
821+
}
822+
823+
/// `read_unverified` must return the raw on-disk bytes even when corrupt,
824+
/// while `read` rejects them — the documented opt-out contract.
825+
#[test]
826+
fn read_unverified_bypasses_verification() {
827+
let (_dir, store) = make_store();
828+
let data = b"trust me blindly".to_vec();
829+
let hash = store.write(&data).unwrap();
830+
831+
let path = on_disk_path(&store, &hash);
832+
let corrupt = b"corrupt but returned by read_unverified".to_vec();
833+
std::fs::write(&path, &corrupt).unwrap();
834+
835+
// Unverified read returns the corrupt bytes as-is, no error.
836+
let raw = store.read_unverified(&hash).unwrap();
837+
assert_eq!(raw, corrupt);
838+
839+
// Verified read rejects them (and quarantines).
840+
let err = store.read(&hash).unwrap_err();
841+
assert!(matches!(err, BlobError::HashMismatch { .. }));
842+
}
843+
844+
/// Quarantining a hash with nothing on disk is a no-op, not an error.
845+
#[test]
846+
fn quarantine_missing_is_noop() {
847+
let (_dir, store) = make_store();
848+
let fake = Hash256([0x11; 32]);
849+
assert_eq!(store.quarantine(&fake).unwrap(), None);
850+
}
851+
852+
/// A valid (uncorrupted) blob round-trips through the verifying read.
853+
#[test]
854+
fn verified_read_accepts_valid_blob() {
855+
let (_dir, store) = make_store();
856+
let data: Vec<u8> = (0..50_000).map(|i| (i % 256) as u8).collect();
857+
let hash = store.write(&data).unwrap();
858+
let retrieved = store.read(&hash).unwrap();
859+
assert_eq!(retrieved, data);
860+
// No spurious quarantine of healthy data.
861+
assert!(corrupt_dir_entries(&store).is_empty());
862+
}
592863
}

0 commit comments

Comments
 (0)