Skip to content

Commit 12ca8e1

Browse files
committed
[ENH] Add RPC endpoints for sealing and migrating logs.
This updates the IDL for the log service to have two new methods: SealLog and MigrateLog. They each take a collection ID and trigger the two phase transitions for the go-to-rust migration that's coming.
1 parent 713978e commit 12ca8e1

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

go/pkg/log/server/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,15 @@ func (s *logServer) InspectDirtyLog(ctx context.Context, req *logservicepb.Inspe
162162
return
163163
}
164164

165+
func (s *logServer) SealLog(ctx context.Context, req *logservicepb.SealLogRequest) (res *logservicepb.SealLogResponse, err error) {
166+
// no-op for now
167+
return
168+
}
169+
170+
func (s *logServer) MigrateLog(ctx context.Context, req *logservicepb.MigrateLogRequest) (res *logservicepb.MigrateLogResponse, err error) {
171+
// no-op for now
172+
return
173+
}
165174

166175
func NewLogServer(lr *repository.LogRepository) logservicepb.LogServiceServer {
167176
return &logServer{

idl/chromadb/proto/logservice.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,22 @@ message InspectDirtyLogResponse {
9696
repeated string markers = 1;
9797
}
9898

99+
message SealLogRequest {
100+
string collection_id = 1;
101+
}
102+
103+
message SealLogResponse {
104+
// Empty
105+
}
106+
107+
message MigrateLogRequest {
108+
string collection_id = 1;
109+
}
110+
111+
message MigrateLogResponse {
112+
// Empty
113+
}
114+
99115
service LogService {
100116
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
101117
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
@@ -104,5 +120,11 @@ service LogService {
104120
rpc GetAllCollectionInfoToCompact(GetAllCollectionInfoToCompactRequest) returns (GetAllCollectionInfoToCompactResponse) {}
105121
rpc UpdateCollectionLogOffset(UpdateCollectionLogOffsetRequest) returns (UpdateCollectionLogOffsetResponse) {}
106122
rpc PurgeDirtyForCollection(PurgeDirtyForCollectionRequest) returns (PurgeDirtyForCollectionResponse) {}
123+
// RPC endpoints to expose for operator debuggability.
124+
// This endpoint must route to the rust log service.
107125
rpc InspectDirtyLog(InspectDirtyLogRequest) returns (InspectDirtyLogResponse) {}
126+
// This endpoint must route to the go log service.
127+
rpc SealLog(SealLogRequest) returns (SealLogResponse) {}
128+
// This endpoint must route to the rust log service.
129+
rpc MigrateLog(MigrateLogRequest) returns (MigrateLogResponse) {}
108130
}

rust/log-service/src/lib.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ use chroma_storage::Storage;
1616
use chroma_types::chroma_proto::{
1717
log_service_server::LogService, CollectionInfo, GetAllCollectionInfoToCompactRequest,
1818
GetAllCollectionInfoToCompactResponse, InspectDirtyLogRequest, InspectDirtyLogResponse,
19-
LogRecord, OperationRecord, PullLogsRequest, PullLogsResponse, PurgeDirtyForCollectionRequest,
20-
PurgeDirtyForCollectionResponse, PushLogsRequest, PushLogsResponse, ScoutLogsRequest,
21-
ScoutLogsResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
19+
LogRecord, MigrateLogRequest, MigrateLogResponse, OperationRecord, PullLogsRequest,
20+
PullLogsResponse, PurgeDirtyForCollectionRequest, PurgeDirtyForCollectionResponse,
21+
PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse, SealLogRequest,
22+
SealLogResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
2223
};
2324
use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse};
2425
use chroma_types::CollectionUuid;
@@ -1230,6 +1231,24 @@ impl LogService for LogServer {
12301231
}
12311232
Ok(Response::new(InspectDirtyLogResponse { markers }))
12321233
}
1234+
1235+
async fn seal_log(
1236+
&self,
1237+
_request: Request<SealLogRequest>,
1238+
) -> Result<Response<SealLogResponse>, Status> {
1239+
Err(Status::failed_precondition(
1240+
"rust log service doesn't do sealing",
1241+
))
1242+
}
1243+
1244+
async fn migrate_log(
1245+
&self,
1246+
_request: Request<MigrateLogRequest>,
1247+
) -> Result<Response<MigrateLogResponse>, Status> {
1248+
Err(Status::failed_precondition(
1249+
"rust log service doesn't do migrating yet",
1250+
))
1251+
}
12331252
}
12341253

12351254
fn parquet_to_records(parquet: Arc<Vec<u8>>) -> Result<Vec<(LogPosition, Vec<u8>)>, Status> {

0 commit comments

Comments
 (0)