Skip to content

Commit ce84a85

Browse files
committed
untested effectuate transfer
1 parent 09e377c commit ce84a85

File tree

1 file changed

+38
-4
lines changed

1 file changed

+38
-4
lines changed

rust/log-service/src/lib.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -685,16 +685,50 @@ impl LogServer {
685685
std::iter::zip(start..limit, resp.records.into_iter().enumerate())
686686
{
687687
if expect != idx as u64 {
688-
todo!();
688+
return Err(Status::data_loss(format!(
689+
"expected log position {expect} but got {idx}"
690+
)));
689691
}
690692
if (record.log_offset as u64).wrapping_add(1) != expect {
691-
todo!();
693+
return Err(Status::data_loss(format!(
694+
"expected log position {expect} but got {}",
695+
(record.log_offset as u64).wrapping_add(1)
696+
)));
692697
}
693698
records.push(record);
694699
}
695700
}
696-
697-
todo!();
701+
let record_bytes = records
702+
.into_iter()
703+
.map(|record| -> Result<Vec<u8>, Status> {
704+
let mut buf = vec![];
705+
record
706+
.encode(&mut buf)
707+
.map_err(|err| Status::internal(err.to_string()))?;
708+
Ok(buf)
709+
})
710+
.collect::<Result<Vec<_>, Status>>()?;
711+
let prefix = storage_prefix_for_log(collection_id);
712+
let mark_dirty = MarkDirty {
713+
collection_id,
714+
dirty_log: Arc::clone(&self.dirty_log),
715+
};
716+
LogWriter::bootstrap(
717+
&self.config.writer,
718+
&self.storage,
719+
&prefix,
720+
"effectuate log transfer",
721+
mark_dirty,
722+
LogPosition::from_offset(start),
723+
record_bytes,
724+
)
725+
.await
726+
.map_err(|err| {
727+
Status::new(
728+
err.code().into(),
729+
format!("failed to effectuate log transfer: {err:?}"),
730+
)
731+
})
698732
}
699733

700734
async fn forward_push_logs(

0 commit comments

Comments
 (0)