@@ -40,6 +40,21 @@ impl Db {
4040 self . inner . flush_wals ( ) . await ?;
4141 }
4242 self . inner . flush_memtables ( ) . await ?;
43+
44+ // All data is now in L0. Advance replay_after_wal_id past any
45+ // remaining WAL SSTs (e.g. the empty fencing WAL from Db::open)
46+ // so checkpoints/clones don't reference WAL files that may be
47+ // garbage-collected.
48+ {
49+ let mut guard = self . inner . state . write ( ) ;
50+ guard. modify ( |modifier| {
51+ let core = & mut modifier. state . manifest . value . core ;
52+ let target = core. next_wal_sst_id . saturating_sub ( 1 ) ;
53+ if core. replay_after_wal_id < target {
54+ core. replay_after_wal_id = target;
55+ }
56+ } ) ;
57+ }
4358 }
4459
4560 let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
@@ -552,4 +567,322 @@ mod tests {
552567 let filtered_checkpoints = admin. list_checkpoints ( Some ( "non_existent" ) ) . await . unwrap ( ) ;
553568 assert_eq ! ( filtered_checkpoints. len( ) , 0 ) ;
554569 }
570+
571+ #[ tokio:: test]
572+ async fn test_checkpoint_scope_all_closes_wal_gap ( ) {
573+ let object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
574+ let wal_object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
575+ let path = Path :: from ( "/tmp/test_kv_store" ) ;
576+ let db = Db :: builder ( path. clone ( ) , object_store. clone ( ) )
577+ . with_wal_object_store ( wal_object_store)
578+ . with_settings ( Settings {
579+ flush_interval : Some ( Duration :: from_millis ( 5000 ) ) ,
580+ ..Settings :: default ( )
581+ } )
582+ . build ( )
583+ . await
584+ . unwrap ( ) ;
585+
586+ db. put ( b"key1" , b"value1" ) . await . unwrap ( ) ;
587+
588+ let checkpoint = db
589+ . create_checkpoint ( CheckpointScope :: All , & CheckpointOptions :: default ( ) )
590+ . await
591+ . unwrap ( ) ;
592+
593+ let manifest_store = ManifestStore :: new ( & path, object_store. clone ( ) ) ;
594+ let manifest = manifest_store
595+ . read_manifest ( checkpoint. manifest_id )
596+ . await
597+ . unwrap ( ) ;
598+
599+ assert_eq ! (
600+ manifest. core. replay_after_wal_id,
601+ manifest. core. next_wal_sst_id - 1 ,
602+ "CheckpointScope::All should close the WAL gap"
603+ ) ;
604+
605+ db. close ( ) . await . unwrap ( ) ;
606+ }
607+
608+ #[ tokio:: test]
609+ async fn test_checkpoint_wal_replay_needed_without_full_flush ( ) {
610+ let object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
611+ let wal_object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
612+ let path = Path :: from ( "/tmp/test_kv_store" ) ;
613+ let db = Db :: builder ( path. clone ( ) , object_store. clone ( ) )
614+ . with_wal_object_store ( wal_object_store)
615+ . with_settings ( Settings {
616+ flush_interval : Some ( Duration :: from_millis ( 5000 ) ) ,
617+ ..Settings :: default ( )
618+ } )
619+ . build ( )
620+ . await
621+ . unwrap ( ) ;
622+
623+ db. put ( b"key1" , b"value1" ) . await . unwrap ( ) ;
624+
625+ // Durable checkpoint — no memtable flush to L0
626+ let checkpoint = db
627+ . create_checkpoint ( CheckpointScope :: Durable , & CheckpointOptions :: default ( ) )
628+ . await
629+ . unwrap ( ) ;
630+
631+ let manifest_store = ManifestStore :: new ( & path, object_store. clone ( ) ) ;
632+ let manifest = manifest_store
633+ . read_manifest ( checkpoint. manifest_id )
634+ . await
635+ . unwrap ( ) ;
636+
637+ // WAL SSTs should still be referenced (gap exists, replay needed)
638+ assert ! (
639+ manifest. core. replay_after_wal_id < manifest. core. next_wal_sst_id - 1 ,
640+ "CheckpointScope::Durable should NOT close the WAL gap"
641+ ) ;
642+
643+ db. close ( ) . await . unwrap ( ) ;
644+ }
645+
646+ #[ tokio:: test]
647+ async fn test_clone_replays_wal_then_writes_and_flushes ( ) {
648+ use crate :: clone:: create_clone;
649+ use crate :: rand:: DbRand ;
650+ use fail_parallel:: FailPointRegistry ;
651+
652+ let object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
653+ let wal_object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
654+ let parent_path = Path :: from ( "/tmp/test_parent" ) ;
655+ let clone_path = Path :: from ( "/tmp/test_clone" ) ;
656+
657+ // Parent writes data with a Durable checkpoint (WALs NOT flushed to L0)
658+ let parent_db = Db :: builder ( parent_path. clone ( ) , object_store. clone ( ) )
659+ . with_wal_object_store ( wal_object_store. clone ( ) )
660+ . with_settings ( Settings {
661+ flush_interval : Some ( Duration :: from_millis ( 5000 ) ) ,
662+ ..Settings :: default ( )
663+ } )
664+ . build ( )
665+ . await
666+ . unwrap ( ) ;
667+ parent_db. put ( b"key1" , b"value1" ) . await . unwrap ( ) ;
668+ parent_db. put ( b"key2" , b"value2" ) . await . unwrap ( ) ;
669+ let checkpoint = parent_db
670+ . create_checkpoint ( CheckpointScope :: Durable , & CheckpointOptions :: default ( ) )
671+ . await
672+ . unwrap ( ) ;
673+
674+ // Confirm WAL gap exists — clone will need to replay WALs
675+ let manifest_store = ManifestStore :: new ( & parent_path, object_store. clone ( ) ) ;
676+ let manifest = manifest_store
677+ . read_manifest ( checkpoint. manifest_id )
678+ . await
679+ . unwrap ( ) ;
680+ assert ! ( manifest. core. replay_after_wal_id < manifest. core. next_wal_sst_id - 1 ) ;
681+
682+ parent_db. close ( ) . await . unwrap ( ) ;
683+
684+ // Create clone — WAL SSTs are copied and will be replayed on open
685+ create_clone (
686+ clone_path. clone ( ) ,
687+ parent_path,
688+ object_store. clone ( ) ,
689+ wal_object_store. clone ( ) ,
690+ wal_object_store. clone ( ) ,
691+ None ,
692+ Arc :: new ( FailPointRegistry :: new ( ) ) ,
693+ Arc :: new ( DefaultSystemClock :: default ( ) ) ,
694+ Arc :: new ( DbRand :: default ( ) ) ,
695+ )
696+ . await
697+ . unwrap ( ) ;
698+
699+ // Open clone — replays WALs, then write more data and flush
700+ let clone_db = Db :: builder ( clone_path. clone ( ) , object_store. clone ( ) )
701+ . with_wal_object_store ( wal_object_store. clone ( ) )
702+ . build ( )
703+ . await
704+ . unwrap ( ) ;
705+ clone_db. put ( b"key3" , b"value3" ) . await . unwrap ( ) ;
706+ clone_db. put ( b"key4" , b"value4" ) . await . unwrap ( ) ;
707+
708+ let clone_checkpoint = clone_db
709+ . create_checkpoint ( CheckpointScope :: All , & CheckpointOptions :: default ( ) )
710+ . await
711+ . unwrap ( ) ;
712+
713+ // After full flush, clone should have no WAL gap
714+ let clone_manifest_store = ManifestStore :: new ( & clone_path, object_store. clone ( ) ) ;
715+ let clone_manifest = clone_manifest_store
716+ . read_manifest ( clone_checkpoint. manifest_id )
717+ . await
718+ . unwrap ( ) ;
719+ assert_eq ! (
720+ clone_manifest. core. replay_after_wal_id,
721+ clone_manifest. core. next_wal_sst_id - 1 ,
722+ ) ;
723+
724+ // All rows — replayed from parent WAL + written on clone — should be present
725+ assert_eq ! (
726+ clone_db. get( b"key1" ) . await . unwrap( ) . as_deref( ) ,
727+ Some ( b"value1" . as_ref( ) )
728+ ) ;
729+ assert_eq ! (
730+ clone_db. get( b"key2" ) . await . unwrap( ) . as_deref( ) ,
731+ Some ( b"value2" . as_ref( ) )
732+ ) ;
733+ assert_eq ! (
734+ clone_db. get( b"key3" ) . await . unwrap( ) . as_deref( ) ,
735+ Some ( b"value3" . as_ref( ) )
736+ ) ;
737+ assert_eq ! (
738+ clone_db. get( b"key4" ) . await . unwrap( ) . as_deref( ) ,
739+ Some ( b"value4" . as_ref( ) )
740+ ) ;
741+ clone_db. close ( ) . await . unwrap ( ) ;
742+ }
743+
744+ #[ tokio:: test]
745+ async fn test_parent_can_write_after_checkpoint_scope_all ( ) {
746+ let object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
747+ let wal_object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
748+ let path = Path :: from ( "/tmp/test_kv_store" ) ;
749+ let db = Db :: builder ( path. clone ( ) , object_store. clone ( ) )
750+ . with_wal_object_store ( wal_object_store. clone ( ) )
751+ . with_settings ( Settings {
752+ flush_interval : Some ( Duration :: from_millis ( 5000 ) ) ,
753+ ..Settings :: default ( )
754+ } )
755+ . build ( )
756+ . await
757+ . unwrap ( ) ;
758+
759+ // Write initial data and flush
760+ db. put ( b"key1" , b"value1" ) . await . unwrap ( ) ;
761+ db. create_checkpoint ( CheckpointScope :: All , & CheckpointOptions :: default ( ) )
762+ . await
763+ . unwrap ( ) ;
764+
765+ // Write more data after the flush
766+ db. put ( b"key2" , b"value2" ) . await . unwrap ( ) ;
767+ db. put ( b"key3" , b"value3" ) . await . unwrap ( ) ;
768+
769+ // All data should be readable
770+ assert_eq ! (
771+ db. get( b"key1" ) . await . unwrap( ) . as_deref( ) ,
772+ Some ( b"value1" . as_ref( ) )
773+ ) ;
774+ assert_eq ! (
775+ db. get( b"key2" ) . await . unwrap( ) . as_deref( ) ,
776+ Some ( b"value2" . as_ref( ) )
777+ ) ;
778+ assert_eq ! (
779+ db. get( b"key3" ) . await . unwrap( ) . as_deref( ) ,
780+ Some ( b"value3" . as_ref( ) )
781+ ) ;
782+
783+ // Second checkpoint should also close the gap
784+ let checkpoint2 = db
785+ . create_checkpoint ( CheckpointScope :: All , & CheckpointOptions :: default ( ) )
786+ . await
787+ . unwrap ( ) ;
788+ let manifest_store = ManifestStore :: new ( & path, object_store. clone ( ) ) ;
789+ let manifest = manifest_store
790+ . read_manifest ( checkpoint2. manifest_id )
791+ . await
792+ . unwrap ( ) ;
793+ assert_eq ! (
794+ manifest. core. replay_after_wal_id,
795+ manifest. core. next_wal_sst_id - 1 ,
796+ ) ;
797+
798+ // Close and reopen — all data should survive
799+ db. close ( ) . await . unwrap ( ) ;
800+ let db = Db :: builder ( path. clone ( ) , object_store. clone ( ) )
801+ . with_wal_object_store ( wal_object_store)
802+ . build ( )
803+ . await
804+ . unwrap ( ) ;
805+ assert_eq ! (
806+ db. get( b"key1" ) . await . unwrap( ) . as_deref( ) ,
807+ Some ( b"value1" . as_ref( ) )
808+ ) ;
809+ assert_eq ! (
810+ db. get( b"key2" ) . await . unwrap( ) . as_deref( ) ,
811+ Some ( b"value2" . as_ref( ) )
812+ ) ;
813+ assert_eq ! (
814+ db. get( b"key3" ) . await . unwrap( ) . as_deref( ) ,
815+ Some ( b"value3" . as_ref( ) )
816+ ) ;
817+ db. close ( ) . await . unwrap ( ) ;
818+ }
819+
820+ #[ tokio:: test]
821+ async fn test_clone_succeeds_after_wal_cleanup ( ) {
822+ use crate :: clone:: create_clone;
823+ use crate :: rand:: DbRand ;
824+ use fail_parallel:: FailPointRegistry ;
825+ use futures:: TryStreamExt ;
826+
827+ let object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
828+ let wal_object_store: Arc < dyn ObjectStore > = Arc :: new ( InMemory :: new ( ) ) ;
829+ let parent_path = Path :: from ( "/tmp/test_parent" ) ;
830+ let clone_path = Path :: from ( "/tmp/test_clone" ) ;
831+
832+ // Write data to the parent and flush everything to L0
833+ let parent_db = Db :: builder ( parent_path. clone ( ) , object_store. clone ( ) )
834+ . with_wal_object_store ( wal_object_store. clone ( ) )
835+ . build ( )
836+ . await
837+ . unwrap ( ) ;
838+ parent_db. put ( b"key1" , b"value1" ) . await . unwrap ( ) ;
839+ parent_db. put ( b"key2" , b"value2" ) . await . unwrap ( ) ;
840+ parent_db
841+ . create_checkpoint ( CheckpointScope :: All , & CheckpointOptions :: default ( ) )
842+ . await
843+ . unwrap ( ) ;
844+ parent_db. close ( ) . await . unwrap ( ) ;
845+
846+ // Simulate external WAL cleanup (silo cleaning local disk / pod death)
847+ let wal_path = crate :: paths:: PathResolver :: new ( parent_path. clone ( ) ) . wal_path ( ) ;
848+ let wal_files: Vec < _ > = wal_object_store
849+ . list ( Some ( & wal_path) )
850+ . try_collect ( )
851+ . await
852+ . unwrap ( ) ;
853+ for file in & wal_files {
854+ wal_object_store. delete ( & file. location ) . await . unwrap ( ) ;
855+ }
856+
857+ // Clone should succeed — no WAL gap to copy
858+ create_clone (
859+ clone_path. clone ( ) ,
860+ parent_path,
861+ object_store. clone ( ) ,
862+ wal_object_store. clone ( ) ,
863+ wal_object_store. clone ( ) ,
864+ None ,
865+ Arc :: new ( FailPointRegistry :: new ( ) ) ,
866+ Arc :: new ( DefaultSystemClock :: default ( ) ) ,
867+ Arc :: new ( DbRand :: default ( ) ) ,
868+ )
869+ . await
870+ . unwrap ( ) ;
871+
872+ // Clone should have all the data
873+ let clone_db = Db :: builder ( clone_path, object_store)
874+ . with_wal_object_store ( wal_object_store)
875+ . build ( )
876+ . await
877+ . unwrap ( ) ;
878+ assert_eq ! (
879+ clone_db. get( b"key1" ) . await . unwrap( ) . as_deref( ) ,
880+ Some ( b"value1" . as_ref( ) )
881+ ) ;
882+ assert_eq ! (
883+ clone_db. get( b"key2" ) . await . unwrap( ) . as_deref( ) ,
884+ Some ( b"value2" . as_ref( ) )
885+ ) ;
886+ clone_db. close ( ) . await . unwrap ( ) ;
887+ }
555888}
0 commit comments