Skip to content

Commit abf5b1e

Browse files
committed
[ENH] Add RPC endpoints for sealing and migrating logs.
This updates the IDL for the log service to have two new methods: SealLog and MigrateLog. They each take a collection ID and trigger the two phase transitions for the go-to-rust migration that's coming.
1 parent bf0e7e1 commit abf5b1e

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

go/pkg/log/server/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,15 @@ func (s *logServer) InspectDirtyLog(ctx context.Context, req *logservicepb.Inspe
164164
return
165165
}
166166

167+
func (s *logServer) SealLog(ctx context.Context, req *logservicepb.SealLogRequest) (res *logservicepb.SealLogResponse, err error) {
168+
// no-op for now
169+
return
170+
}
171+
172+
func (s *logServer) MigrateLog(ctx context.Context, req *logservicepb.MigrateLogRequest) (res *logservicepb.MigrateLogResponse, err error) {
173+
// no-op for now
174+
return
175+
}
167176

168177
func NewLogServer(lr *repository.LogRepository) logservicepb.LogServiceServer {
169178
return &logServer{

idl/chromadb/proto/logservice.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,22 @@ message InspectDirtyLogResponse {
9898
repeated string markers = 1;
9999
}
100100

101+
message SealLogRequest {
102+
string collection_id = 1;
103+
}
104+
105+
message SealLogResponse {
106+
// Empty
107+
}
108+
109+
message MigrateLogRequest {
110+
string collection_id = 1;
111+
}
112+
113+
message MigrateLogResponse {
114+
// Empty
115+
}
116+
101117
service LogService {
102118
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
103119
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
@@ -106,5 +122,11 @@ service LogService {
106122
rpc GetAllCollectionInfoToCompact(GetAllCollectionInfoToCompactRequest) returns (GetAllCollectionInfoToCompactResponse) {}
107123
rpc UpdateCollectionLogOffset(UpdateCollectionLogOffsetRequest) returns (UpdateCollectionLogOffsetResponse) {}
108124
rpc PurgeDirtyForCollection(PurgeDirtyForCollectionRequest) returns (PurgeDirtyForCollectionResponse) {}
125+
// RPC endpoints to expose for operator debuggability.
126+
// This endpoint must route to the rust log service.
109127
rpc InspectDirtyLog(InspectDirtyLogRequest) returns (InspectDirtyLogResponse) {}
128+
// This endpoint must route to the go log service.
129+
rpc SealLog(SealLogRequest) returns (SealLogResponse) {}
130+
// This endpoint must route to the rust log service.
131+
rpc MigrateLog(MigrateLogRequest) returns (MigrateLogResponse) {}
110132
}

rust/log-service/src/lib.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ use chroma_storage::Storage;
1616
use chroma_types::chroma_proto::{
1717
log_service_server::LogService, CollectionInfo, GetAllCollectionInfoToCompactRequest,
1818
GetAllCollectionInfoToCompactResponse, InspectDirtyLogRequest, InspectDirtyLogResponse,
19-
LogRecord, OperationRecord, PullLogsRequest, PullLogsResponse, PurgeDirtyForCollectionRequest,
20-
PurgeDirtyForCollectionResponse, PushLogsRequest, PushLogsResponse, ScoutLogsRequest,
21-
ScoutLogsResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
19+
LogRecord, MigrateLogRequest, MigrateLogResponse, OperationRecord, PullLogsRequest,
20+
PullLogsResponse, PurgeDirtyForCollectionRequest, PurgeDirtyForCollectionResponse,
21+
PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse, SealLogRequest,
22+
SealLogResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
2223
};
2324
use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse};
2425
use chroma_types::CollectionUuid;
@@ -1236,6 +1237,24 @@ impl LogService for LogServer {
12361237
}
12371238
Ok(Response::new(InspectDirtyLogResponse { markers }))
12381239
}
1240+
1241+
async fn seal_log(
1242+
&self,
1243+
_request: Request<SealLogRequest>,
1244+
) -> Result<Response<SealLogResponse>, Status> {
1245+
Err(Status::failed_precondition(
1246+
"rust log service doesn't do sealing",
1247+
))
1248+
}
1249+
1250+
async fn migrate_log(
1251+
&self,
1252+
_request: Request<MigrateLogRequest>,
1253+
) -> Result<Response<MigrateLogResponse>, Status> {
1254+
Err(Status::failed_precondition(
1255+
"rust log service doesn't do migrating yet",
1256+
))
1257+
}
12391258
}
12401259

12411260
fn parquet_to_records(parquet: Arc<Vec<u8>>) -> Result<Vec<(LogPosition, Vec<u8>)>, Status> {

0 commit comments

Comments
 (0)