Skip to content

Commit 3da2abd

Browse files
committed
[ENH] Add RPC endpoints for sealing and migrating logs.
This updates the IDL for the log service to have two new methods: SealLog and MigrateLog. They each take a collection ID and trigger the two phase transitions for the go-to-rust migration that's coming.
1 parent 46930ce commit 3da2abd

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

go/pkg/log/server/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,15 @@ func (s *logServer) InspectDirtyLog(ctx context.Context, req *logservicepb.Inspe
164164
return
165165
}
166166

167+
func (s *logServer) SealLog(ctx context.Context, req *logservicepb.SealLogRequest) (res *logservicepb.SealLogResponse, err error) {
168+
// no-op for now
169+
return
170+
}
171+
172+
func (s *logServer) MigrateLog(ctx context.Context, req *logservicepb.MigrateLogRequest) (res *logservicepb.MigrateLogResponse, err error) {
173+
// no-op for now
174+
return
175+
}
167176

168177
func NewLogServer(lr *repository.LogRepository) logservicepb.LogServiceServer {
169178
return &logServer{

idl/chromadb/proto/logservice.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,22 @@ message InspectDirtyLogResponse {
9898
repeated string markers = 1;
9999
}
100100

101+
message SealLogRequest {
102+
string collection_id = 1;
103+
}
104+
105+
message SealLogResponse {
106+
// Empty
107+
}
108+
109+
message MigrateLogRequest {
110+
string collection_id = 1;
111+
}
112+
113+
message MigrateLogResponse {
114+
// Empty
115+
}
116+
101117
service LogService {
102118
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
103119
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
@@ -106,5 +122,11 @@ service LogService {
106122
rpc GetAllCollectionInfoToCompact(GetAllCollectionInfoToCompactRequest) returns (GetAllCollectionInfoToCompactResponse) {}
107123
rpc UpdateCollectionLogOffset(UpdateCollectionLogOffsetRequest) returns (UpdateCollectionLogOffsetResponse) {}
108124
rpc PurgeDirtyForCollection(PurgeDirtyForCollectionRequest) returns (PurgeDirtyForCollectionResponse) {}
125+
// RPC endpoints to expose for operator debuggability.
126+
// This endpoint must route to the rust log service.
109127
rpc InspectDirtyLog(InspectDirtyLogRequest) returns (InspectDirtyLogResponse) {}
128+
// This endpoint must route to the go log service.
129+
rpc SealLog(SealLogRequest) returns (SealLogResponse) {}
130+
// This endpoint must route to the rust log service.
131+
rpc MigrateLog(MigrateLogRequest) returns (MigrateLogResponse) {}
110132
}

rust/log-service/src/lib.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ use chroma_storage::Storage;
1515
use chroma_types::chroma_proto::{
1616
log_service_server::LogService, CollectionInfo, GetAllCollectionInfoToCompactRequest,
1717
GetAllCollectionInfoToCompactResponse, InspectDirtyLogRequest, InspectDirtyLogResponse,
18-
LogRecord, OperationRecord, PullLogsRequest, PullLogsResponse, PurgeDirtyForCollectionRequest,
19-
PurgeDirtyForCollectionResponse, PushLogsRequest, PushLogsResponse, ScoutLogsRequest,
20-
ScoutLogsResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
18+
LogRecord, MigrateLogRequest, MigrateLogResponse, OperationRecord, PullLogsRequest,
19+
PullLogsResponse, PurgeDirtyForCollectionRequest, PurgeDirtyForCollectionResponse,
20+
PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse, SealLogRequest,
21+
SealLogResponse, UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
2122
};
2223
use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse};
2324
use chroma_types::CollectionUuid;
@@ -1204,6 +1205,24 @@ impl LogService for LogServer {
12041205
}
12051206
Ok(Response::new(InspectDirtyLogResponse { markers }))
12061207
}
1208+
1209+
async fn seal_log(
1210+
&self,
1211+
_request: Request<SealLogRequest>,
1212+
) -> Result<Response<SealLogResponse>, Status> {
1213+
Err(Status::failed_precondition(
1214+
"rust log service doesn't do sealing",
1215+
))
1216+
}
1217+
1218+
async fn migrate_log(
1219+
&self,
1220+
_request: Request<MigrateLogRequest>,
1221+
) -> Result<Response<MigrateLogResponse>, Status> {
1222+
Err(Status::failed_precondition(
1223+
"rust log service doesn't do migrating yet",
1224+
))
1225+
}
12071226
}
12081227

12091228
fn parquet_to_records(parquet: Arc<Vec<u8>>) -> Result<Vec<(LogPosition, Vec<u8>)>, Status> {

0 commit comments

Comments
 (0)