Skip to content

Commit 67eba39

Browse files
author
Sicheng Pan
committed
[ENH] Implement log forking
1 parent c9b045b commit 67eba39

File tree

11 files changed

+210
-22
lines changed

11 files changed

+210
-22
lines changed

Diff for: go/pkg/log/repository/log.go

+41
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,47 @@ func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, of
126126
return
127127
}
128128

129+
func (r *LogRepository) ForkRecords(ctx context.Context, sourceCollectionID string, targetCollectionID string) (err error) {
130+
var tx pgx.Tx
131+
tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{})
132+
if err != nil {
133+
trace_log.Error("Error in begin transaction for forking logs in log service", zap.Error(err), zap.String("sourceCollectionID", sourceCollectionID))
134+
return
135+
}
136+
queriesWithTx := r.queries.WithTx(tx)
137+
defer func() {
138+
if err != nil {
139+
tx.Rollback(ctx)
140+
} else {
141+
err = tx.Commit(ctx)
142+
}
143+
}()
144+
145+
err = queriesWithTx.LockCollection(ctx, sourceCollectionID)
146+
if err != nil {
147+
trace_log.Error("Error locking collection for fork", zap.String("sourceCollectionID", sourceCollectionID))
148+
return
149+
}
150+
err = queriesWithTx.ForkCollectionOffset(ctx, log.ForkCollectionOffsetParams{
151+
ID: sourceCollectionID,
152+
ID_2: targetCollectionID,
153+
})
154+
if err != nil {
155+
trace_log.Error("Error forking log offset", zap.String("sourceCollectionID", sourceCollectionID))
156+
return
157+
}
158+
err = queriesWithTx.ForkCollectionRecord(ctx, log.ForkCollectionRecordParams{
159+
CollectionID: sourceCollectionID,
160+
CollectionID_2: targetCollectionID,
161+
})
162+
if err != nil {
163+
trace_log.Error("Error forking log record", zap.String("sourceCollectionID", sourceCollectionID))
164+
return
165+
}
166+
167+
return
168+
}
169+
129170
func (r *LogRepository) GetAllCollectionInfoToCompact(ctx context.Context, minCompactionSize uint64) (collectionToCompact []log.GetAllCollectionsToCompactRow, err error) {
130171
collectionToCompact, err = r.queries.GetAllCollectionsToCompact(ctx, int64(minCompactionSize))
131172
if collectionToCompact == nil {

Diff for: go/pkg/log/server/server.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s *logServer) ScoutLogs(ctx context.Context, req *logservicepb.ScoutLogsRe
5656
return
5757
}
5858
// +1 to convert from the (] bound to a [) bound.
59-
res = &logservicepb.ScoutLogsResponse {
59+
res = &logservicepb.ScoutLogsResponse{
6060
FirstUninsertedRecordOffset: int64(limit + 1),
6161
}
6262
return
@@ -90,6 +90,27 @@ func (s *logServer) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequ
9090
return
9191
}
9292

93+
func (s *logServer) ForkLogs(ctx context.Context, req *logservicepb.ForkLogsRequest) (res *logservicepb.ForkLogsResponse, err error) {
94+
var sourceCollectionID types.UniqueID
95+
var targetCollectionID types.UniqueID
96+
sourceCollectionID, err = types.ToUniqueID(&req.SourceCollectionId)
97+
if err != nil {
98+
return
99+
}
100+
targetCollectionID, err = types.ToUniqueID(&req.TargetCollectionId)
101+
if err != nil {
102+
return
103+
}
104+
105+
err = s.lr.ForkRecords(ctx, sourceCollectionID.String(), targetCollectionID.String())
106+
if err != nil {
107+
return
108+
}
109+
110+
res = &logservicepb.ForkLogsResponse{}
111+
return
112+
}
113+
93114
func (s *logServer) GetAllCollectionInfoToCompact(ctx context.Context, req *logservicepb.GetAllCollectionInfoToCompactRequest) (res *logservicepb.GetAllCollectionInfoToCompactResponse, err error) {
94115
var collectionToCompact []log.GetAllCollectionsToCompactRow
95116
collectionToCompact, err = s.lr.GetAllCollectionInfoToCompact(ctx, req.MinCompactionSize)

Diff for: go/pkg/log/store/db/queries.sql.go

+44-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: go/pkg/log/store/queries/queries.sql

+15
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,18 @@ SELECT id FROM collection;
5858

5959
-- name: GetLastCompactedOffset :one
6060
SELECT record_compaction_offset_position FROM collection c WHERE c.id = $1;
61+
62+
-- name: LockCollection :exec
63+
SELECT * FROM collection WHERE id = $1 FOR UPDATE;
64+
65+
-- name: ForkCollectionOffset :exec
66+
INSERT INTO collection (id, record_compaction_offset_position, record_enumeration_offset_position)
67+
SELECT $2, collection.record_compaction_offset_position, collection.record_enumeration_offset_position
68+
FROM collection
69+
WHERE collection.id = $1;
70+
71+
-- name: ForkCollectionRecord :exec
72+
INSERT INTO record_log ("offset", collection_id, timestamp, record)
73+
SELECT record_log.offset, $2, record_log.timestamp, record_log.record
74+
FROM record_log
75+
WHERE record_log.collection_id = $1;

Diff for: go/pkg/sysdb/coordinator/coordinator_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -1416,11 +1416,9 @@ func (suite *APIsTestSuite) TestForkCollection() {
14161416
}
14171417

14181418
sourceFlushCollectionCompaction := &model.FlushCollectionCompaction{
1419-
ID: sourceCreateCollection.ID,
1420-
TenantID: sourceCreateCollection.TenantID,
1421-
// TODO: Inherit log position after log fork is implemented
1422-
// LogPosition: 1000,
1423-
LogPosition: 0,
1419+
ID: sourceCreateCollection.ID,
1420+
TenantID: sourceCreateCollection.TenantID,
1421+
LogPosition: 1000,
14241422
CurrentCollectionVersion: 0,
14251423
FlushSegmentCompactions: []*model.FlushSegmentCompaction{
14261424
sourceFlushMetadataSegment,

Diff for: go/pkg/sysdb/coordinator/table_catalog.go

+10-12
Original file line numberDiff line numberDiff line change
@@ -868,18 +868,16 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For
868868
}
869869

870870
createCollection := &model.CreateCollection{
871-
ID: forkCollection.TargetCollectionID,
872-
Name: forkCollection.TargetCollectionName,
873-
ConfigurationJsonStr: sourceCollection.ConfigurationJsonStr,
874-
Dimension: sourceCollection.Dimension,
875-
Metadata: sourceCollection.Metadata,
876-
GetOrCreate: false,
877-
TenantID: sourceCollection.TenantID,
878-
DatabaseName: sourceCollection.DatabaseName,
879-
Ts: ts.Unix(),
880-
// TODO: Inherit log position after log fork is implemented
881-
// LogPosition: sourceCollection.LogPosition,
882-
LogPosition: 0,
871+
ID: forkCollection.TargetCollectionID,
872+
Name: forkCollection.TargetCollectionName,
873+
ConfigurationJsonStr: sourceCollection.ConfigurationJsonStr,
874+
Dimension: sourceCollection.Dimension,
875+
Metadata: sourceCollection.Metadata,
876+
GetOrCreate: false,
877+
TenantID: sourceCollection.TenantID,
878+
DatabaseName: sourceCollection.DatabaseName,
879+
Ts: ts.Unix(),
880+
LogPosition: sourceCollection.LogPosition,
883881
RootCollectionId: rootCollectionIDStr,
884882
TotalRecordsPostCompaction: sourceCollection.TotalRecordsPostCompaction,
885883
SizeBytesPostCompaction: sourceCollection.SizeBytesPostCompaction,

Diff for: idl/chromadb/proto/logservice.proto

+10
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ message PullLogsResponse {
4242
repeated LogRecord records = 1;
4343
}
4444

45+
message ForkLogsRequest {
46+
string source_collection_id = 1;
47+
string target_collection_id = 2;
48+
}
49+
50+
message ForkLogsResponse {
51+
// Empty
52+
}
53+
4554
message CollectionInfo {
4655
string collection_id = 1;
4756
// The log offset of the first log entry of the collection that needs to be compacted
@@ -81,6 +90,7 @@ service LogService {
8190
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
8291
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
8392
rpc PullLogs(PullLogsRequest) returns (PullLogsResponse) {}
93+
rpc ForkLogs(ForkLogsRequest) returns (ForkLogsResponse) {}
8494
rpc GetAllCollectionInfoToCompact(GetAllCollectionInfoToCompactRequest) returns (GetAllCollectionInfoToCompactResponse) {}
8595
rpc UpdateCollectionLogOffset(UpdateCollectionLogOffsetRequest) returns (UpdateCollectionLogOffsetResponse) {}
8696
rpc PurgeDirtyForCollection(PurgeDirtyForCollectionRequest) returns (PurgeDirtyForCollectionResponse) {}

Diff for: rust/frontend/src/impls/service_based_frontend.rs

+3
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,9 @@ impl ServiceBasedFrontend {
552552
}: ForkCollectionRequest,
553553
) -> Result<ForkCollectionResponse, ForkCollectionError> {
554554
let target_collection_id = CollectionUuid::new();
555+
self.log_client
556+
.fork_logs(source_collection_id, target_collection_id)
557+
.await?;
555558
let collection_and_segments = self
556559
.sysdb_client
557560
.fork_collection(

Diff for: rust/log-service/src/lib.rs

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use chroma_types::chroma_proto::{
1818
PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse,
1919
UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
2020
};
21+
use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse};
2122
use chroma_types::CollectionUuid;
2223
use figment::providers::{Env, Format, Yaml};
2324
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -763,6 +764,13 @@ impl LogService for LogServer {
763764
.await
764765
}
765766

767+
async fn fork_logs(
768+
&self,
769+
_request: Request<ForkLogsRequest>,
770+
) -> Result<Response<ForkLogsResponse>, Status> {
771+
unimplemented!("Log forking is unimplemented for WAL3 for now")
772+
}
773+
766774
#[tracing::instrument(info, skip(self, request), err(Display))]
767775
async fn get_all_collection_info_to_compact(
768776
&self,

Diff for: rust/log/src/grpc_log.rs

+35
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,23 @@ impl ChromaError for GrpcPushLogsError {
5757
}
5858
}
5959

60+
#[derive(Error, Debug)]
61+
pub enum GrpcForkLogsError {
62+
#[error("Please backoff exponentially and retry")]
63+
Backoff,
64+
#[error("Failed to push logs")]
65+
FailedToForkLogs(#[from] tonic::Status),
66+
}
67+
68+
impl ChromaError for GrpcForkLogsError {
69+
fn code(&self) -> ErrorCodes {
70+
match self {
71+
GrpcForkLogsError::Backoff => ErrorCodes::Unavailable,
72+
GrpcForkLogsError::FailedToForkLogs(_) => ErrorCodes::Internal,
73+
}
74+
}
75+
}
76+
6077
#[derive(Error, Debug)]
6178
pub enum GrpcGetCollectionsWithNewDataError {
6279
#[error("Failed to fetch")]
@@ -306,6 +323,24 @@ impl GrpcLog {
306323
Ok(())
307324
}
308325

326+
pub(super) async fn fork_logs(
327+
&mut self,
328+
source_collection_id: CollectionUuid,
329+
target_collection_id: CollectionUuid,
330+
) -> Result<(), GrpcForkLogsError> {
331+
self.client_for(source_collection_id)
332+
.fork_logs(chroma_proto::ForkLogsRequest {
333+
source_collection_id: source_collection_id.to_string(),
334+
target_collection_id: target_collection_id.to_string(),
335+
})
336+
.await
337+
.map(|_| ())
338+
.map_err(|err| match err.code() {
339+
tonic::Code::Unavailable => GrpcForkLogsError::Backoff,
340+
_ => err.into(),
341+
})
342+
}
343+
309344
pub(crate) async fn get_collections_with_new_data(
310345
&mut self,
311346
min_compaction_size: u64,

Diff for: rust/log/src/log.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use crate::in_memory_log::InMemoryLog;
33
use crate::sqlite_log::SqliteLog;
44
use crate::types::CollectionInfo;
55
use chroma_error::ChromaError;
6-
use chroma_types::{CollectionUuid, LogRecord, OperationRecord, ResetError, ResetResponse};
6+
use chroma_types::{
7+
CollectionUuid, ForkCollectionError, LogRecord, OperationRecord, ResetError, ResetResponse,
8+
};
79
use std::fmt::Debug;
810

911
#[derive(Clone, Debug)]
@@ -92,6 +94,22 @@ impl Log {
9294
}
9395
}
9496

97+
#[tracing::instrument(skip(self))]
98+
pub async fn fork_logs(
99+
&mut self,
100+
source_collection_id: CollectionUuid,
101+
target_collection_id: CollectionUuid,
102+
) -> Result<(), ForkCollectionError> {
103+
match self {
104+
Log::Sqlite(_) => Err(ForkCollectionError::Local),
105+
Log::Grpc(log) => log
106+
.fork_logs(source_collection_id, target_collection_id)
107+
.await
108+
.map_err(|err| err.boxed().into()),
109+
Log::InMemory(_) => Err(ForkCollectionError::Local),
110+
}
111+
}
112+
95113
#[tracing::instrument(skip(self))]
96114
pub async fn get_collections_with_new_data(
97115
&mut self,

0 commit comments

Comments
 (0)