File tree Expand file tree Collapse file tree 1 file changed +38
-4
lines changed Expand file tree Collapse file tree 1 file changed +38
-4
lines changed Original file line number Diff line number Diff line change @@ -684,16 +684,50 @@ impl LogServer {
684
684
std:: iter:: zip ( start..limit, resp. records . into_iter ( ) . enumerate ( ) )
685
685
{
686
686
if expect != idx as u64 {
687
- todo ! ( ) ;
687
+ return Err ( Status :: data_loss ( format ! (
688
+ "expected log position {expect} but got {idx}"
689
+ ) ) ) ;
688
690
}
689
691
if ( record. log_offset as u64 ) . wrapping_add ( 1 ) != expect {
690
- todo ! ( ) ;
692
+ return Err ( Status :: data_loss ( format ! (
693
+ "expected log position {expect} but got {}" ,
694
+ ( record. log_offset as u64 ) . wrapping_add( 1 )
695
+ ) ) ) ;
691
696
}
692
697
records. push ( record) ;
693
698
}
694
699
}
695
-
696
- todo ! ( ) ;
700
+ let record_bytes = records
701
+ . into_iter ( )
702
+ . map ( |record| -> Result < Vec < u8 > , Status > {
703
+ let mut buf = vec ! [ ] ;
704
+ record
705
+ . encode ( & mut buf)
706
+ . map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ?;
707
+ Ok ( buf)
708
+ } )
709
+ . collect :: < Result < Vec < _ > , Status > > ( ) ?;
710
+ let prefix = storage_prefix_for_log ( collection_id) ;
711
+ let mark_dirty = MarkDirty {
712
+ collection_id,
713
+ dirty_log : Arc :: clone ( & self . dirty_log ) ,
714
+ } ;
715
+ LogWriter :: bootstrap (
716
+ & self . config . writer ,
717
+ & self . storage ,
718
+ & prefix,
719
+ "effectuate log transfer" ,
720
+ mark_dirty,
721
+ LogPosition :: from_offset ( start) ,
722
+ record_bytes,
723
+ )
724
+ . await
725
+ . map_err ( |err| {
726
+ Status :: new (
727
+ err. code ( ) . into ( ) ,
728
+ format ! ( "failed to effectuate log transfer: {err:?}" ) ,
729
+ )
730
+ } )
697
731
}
698
732
699
733
async fn forward_push_logs (
You can’t perform that action at this time.
0 commit comments