Skip to content

Commit b883001

Browse files
committed
Start to effectuate log transfer (WIP)
1 parent c9383bd commit b883001

File tree

1 file changed

+69
-4
lines changed

1 file changed

+69
-4
lines changed

rust/log-service/src/lib.rs

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -640,15 +640,78 @@ pub struct LogServer {
640640
}
641641

642642
impl LogServer {
643+
async fn effectuate_log_transfer(
644+
&self,
645+
collection_id: CollectionUuid,
646+
mut proxy: LogServiceClient<chroma_tracing::GrpcTraceService<tonic::transport::Channel>>,
647+
) -> Result<(), Status> {
648+
let scout_request = Request::new(ScoutLogsRequest {
649+
collection_id: collection_id.to_string(),
650+
});
651+
let scout_resp = proxy.clone().scout_logs(scout_request).await?.into_inner();
652+
let start = scout_resp.first_uncompacted_record_offset as u64;
653+
let limit = scout_resp.first_uninserted_record_offset as u64;
654+
const STEP: u64 = 100;
655+
let num_steps = (limit.saturating_sub(start) + STEP - 1) / STEP;
656+
let actual_steps = (0..num_steps)
657+
.map(|x| {
658+
(
659+
start + x * STEP,
660+
std::cmp::min(start + x * STEP + STEP, limit),
661+
)
662+
})
663+
.collect::<Vec<_>>();
664+
let pull_logs_reqs = actual_steps
665+
.iter()
666+
.cloned()
667+
.map(|(start, limit)| PullLogsRequest {
668+
collection_id: collection_id.to_string(),
669+
start_from_offset: start as i64 - 1,
670+
// SAFETY(rescrv): STEP fits a i32.
671+
batch_size: (limit - start) as i32,
672+
end_timestamp: i64::MAX,
673+
});
674+
let mut responses = vec![];
675+
for req in pull_logs_reqs {
676+
let resp = proxy.pull_logs(Request::new(req)).await?.into_inner();
677+
responses.push(resp);
678+
}
679+
let mut records = vec![];
680+
for ((start, limit), resp) in
681+
std::iter::zip(actual_steps.into_iter(), responses.into_iter())
682+
{
683+
for (expect, (idx, record)) in
684+
std::iter::zip(start..limit, resp.records.into_iter().enumerate())
685+
{
686+
if expect != idx as u64 {
687+
todo!();
688+
}
689+
if (record.log_offset as u64).wrapping_add(1) != expect {
690+
todo!();
691+
}
692+
records.push(record);
693+
}
694+
}
695+
696+
todo!();
697+
}
698+
643699
async fn forward_push_logs(
644700
&self,
701+
collection_id: CollectionUuid,
645702
request: Request<PushLogsRequest>,
646703
) -> Result<Response<PushLogsResponse>, Status> {
704+
let request = request.into_inner();
647705
if let Some(proxy) = self.proxy.as_ref() {
648-
let resp = proxy.clone().push_logs(request).await?;
649-
let resp = resp.into_inner();
706+
let resp = proxy
707+
.clone()
708+
.push_logs(Request::new(request.clone()))
709+
.await?
710+
.into_inner();
650711
if resp.log_is_sealed {
651-
todo!();
712+
self.effectuate_log_transfer(collection_id, proxy.clone())
713+
.await?;
714+
self.push_logs(Request::new(request)).await
652715
} else {
653716
Ok(Response::new(resp))
654717
}
@@ -740,7 +803,9 @@ impl LogService for LogServer {
740803
{
741804
Ok(log) => log,
742805
Err(wal3::Error::UninitializedLog) => {
743-
return self.forward_push_logs(Request::new(push_logs)).await;
806+
return self
807+
.forward_push_logs(collection_id, Request::new(push_logs))
808+
.await;
744809
}
745810
Err(err) => {
746811
return Err(Status::unknown(err.to_string()));

0 commit comments

Comments
 (0)