Skip to content

Commit bb3b8b7

Browse files
committed
collapse code paths to not have two means by which logs end up on RLS
1 parent f8c89e5 commit bb3b8b7

File tree

1 file changed

+17
-62
lines changed

1 file changed

+17
-62
lines changed

rust/log-service/src/lib.rs

+17-62
Original file line numberDiff line numberDiff line change
@@ -640,15 +640,21 @@ pub struct LogServer {
640640
}
641641

642642
impl LogServer {
643-
fn should_initialize_log(&self, collection: CollectionUuid) -> bool {
644-
todo!();
645-
}
646-
647643
async fn forward_push_logs(
648644
&self,
649645
request: Request<PushLogsRequest>,
650646
) -> Result<Response<PushLogsResponse>, Status> {
651-
todo!();
647+
if let Some(proxy) = self.proxy.as_ref() {
648+
let resp = proxy.clone().push_logs(request).await?;
649+
let resp = resp.into_inner();
650+
if resp.log_is_sealed {
651+
todo!();
652+
} else {
653+
Ok(Response::new(resp))
654+
}
655+
} else {
656+
Err(Status::failed_precondition("proxy not initialized"))
657+
}
652658
}
653659

654660
async fn forward_scout_logs(
@@ -734,19 +740,6 @@ impl LogService for LogServer {
734740
{
735741
Ok(log) => log,
736742
Err(wal3::Error::UninitializedLog) => {
737-
if self.should_initialize_log(collection_id) {
738-
if let Err(err) = LogWriter::initialize(
739-
&self.config.writer,
740-
&self.storage,
741-
&prefix,
742-
"push_logs initializer",
743-
)
744-
.await
745-
{
746-
return Err(Status::unknown(err.to_string()));
747-
}
748-
return Box::pin(self.push_logs(Request::new(push_logs))).await;
749-
}
750743
return self.forward_push_logs(Request::new(push_logs)).await;
751744
}
752745
Err(err) => {
@@ -800,15 +793,7 @@ impl LogService for LogServer {
800793
let limit_position = match log_reader.maximum_log_position().await {
801794
Ok(limit_position) => limit_position,
802795
Err(wal3::Error::UninitializedLog) => {
803-
// NOTE(rescrv): In this case, what we have is a guarantee that we know the
804-
// log is ours. This only comes from the by-tenant or by-collection alt-log
805-
// shunts. The log is uninitialized, so we know it's not because
806-
// has_manifest().
807-
if self.should_initialize_log(collection_id) {
808-
LogPosition::from_offset(1)
809-
} else {
810-
return self.forward_scout_logs(Request::new(scout_logs)).await;
811-
}
796+
return self.forward_scout_logs(Request::new(scout_logs)).await;
812797
}
813798
Err(err) => {
814799
if err.code() == chroma_error::ErrorCodes::FailedPrecondition {
@@ -861,13 +846,7 @@ impl LogService for LogServer {
861846
{
862847
Ok(fragments) => fragments,
863848
Err(wal3::Error::UninitializedLog) => {
864-
// NOTE(rescrv): Same as with ScoutLogs.
865-
if self.should_initialize_log(collection_id) {
866-
tracing::info!("Uninitialized log for collection {}", collection_id);
867-
return Ok(Response::new(PullLogsResponse { records: vec![] }));
868-
} else {
869-
return self.forward_pull_logs(Request::new(pull_logs)).await;
870-
}
849+
return self.forward_pull_logs(Request::new(pull_logs)).await;
871850
}
872851
Err(err) => {
873852
return Err(Status::new(err.code().into(), err.to_string()));
@@ -953,25 +932,7 @@ impl LogService for LogServer {
953932
if let Err(err) = log_reader.maximum_log_position().await {
954933
match err {
955934
wal3::Error::UninitializedLog => {
956-
// NOTE(rescrv): Same as with ScoutLogs.
957-
if self.should_initialize_log(source_collection_id) {
958-
LogWriter::initialize(
959-
&self.config.writer,
960-
&storage,
961-
&source_prefix,
962-
"fork logs initializer",
963-
)
964-
.await
965-
.map_err(|err| {
966-
Status::new(
967-
err.code().into(),
968-
format!("Failed to initialize log for fork: {err:?}"),
969-
)
970-
})?;
971-
return Box::pin(self.fork_logs(Request::new(request))).await;
972-
} else {
973-
return self.forward_fork_logs(Request::new(request)).await;
974-
}
935+
return self.forward_fork_logs(Request::new(request)).await;
975936
}
976937
_ => {
977938
return Err(Status::new(
@@ -1205,15 +1166,9 @@ impl LogService for LogServer {
12051166

12061167
let res = log_reader.maximum_log_position().await;
12071168
if let Err(wal3::Error::UninitializedLog) = res {
1208-
if self.should_initialize_log(collection_id) {
1209-
return Err(Status::failed_precondition(
1210-
"uninitialized log has its cursor updated",
1211-
));
1212-
} else {
1213-
return self
1214-
.forward_update_collection_log_offset(Request::new(request))
1215-
.await;
1216-
}
1169+
return self
1170+
.forward_update_collection_log_offset(Request::new(request))
1171+
.await;
12171172
}
12181173
res.map_err(|err| Status::unknown(err.to_string()))?;
12191174

0 commit comments

Comments
 (0)