Skip to content

[ENH] Implement log forking #4326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,75 @@ func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, of
return
}

func (r *LogRepository) ForkRecords(ctx context.Context, sourceCollectionID string, targetCollectionID string) (compactionOffset uint64, enumerationOffset uint64, err error) {
var tx pgx.Tx
tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
trace_log.Error("Error in begin transaction for forking logs in log service", zap.Error(err), zap.String("sourceCollectionID", sourceCollectionID))
return
}
queriesWithTx := r.queries.WithTx(tx)
defer func() {
if err != nil {
tx.Rollback(ctx)
} else {
err = tx.Commit(ctx)
}
}()

sourceBounds, err := queriesWithTx.GetBoundsForCollection(ctx, sourceCollectionID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle the case when no row exists in collection table. This can happen right after you create a collection before adding any data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be handled by the query

if err != nil {
trace_log.Error("Error in getting compaction and enumeration offset for source collection", zap.Error(err), zap.String("collectionId", sourceCollectionID))
return
}
err = queriesWithTx.ForkCollectionRecord(ctx, log.ForkCollectionRecordParams{
CollectionID: sourceCollectionID,
CollectionID_2: targetCollectionID,
})
if err != nil {
trace_log.Error("Error forking log record", zap.String("sourceCollectionID", sourceCollectionID))
return
}
targetBounds, err := queriesWithTx.GetMinimumMaximumOffsetForCollection(ctx, targetCollectionID)
if err != nil {
trace_log.Error("Error in deriving compaction and enumeration offset for target collection", zap.Error(err), zap.String("collectionId", targetCollectionID))
return
}

if targetBounds.MinOffset == 0 {
// Either the source collection is not compacted yet or no log is forked
compactionOffset = uint64(sourceBounds.RecordCompactionOffsetPosition)
} else {
// Some logs are forked, the min offset is guaranteed to be larger than source compaction offset
compactionOffset = uint64(targetBounds.MinOffset - 1)
}
err = queriesWithTx.UpdateCollectionCompactionOffsetPosition(ctx, log.UpdateCollectionCompactionOffsetPositionParams{
ID: targetCollectionID,
RecordCompactionOffsetPosition: int64(compactionOffset),
})
if err != nil {
trace_log.Error("Error in updating compaction offset for target collection", zap.Error(err), zap.String("collectionId", targetCollectionID))
return
}

if targetBounds.MaxOffset == 0 {
// Either the source collection is empty or no log is forked
enumerationOffset = uint64(sourceBounds.RecordEnumerationOffsetPosition)
} else {
// Some logs are forked. The max offset is the enumeration offset
compactionOffset = uint64(targetBounds.MaxOffset)
}
err = queriesWithTx.UpdateCollectionEnumerationOffsetPosition(ctx, log.UpdateCollectionEnumerationOffsetPositionParams{
ID: targetCollectionID,
RecordEnumerationOffsetPosition: int64(enumerationOffset),
})
if err != nil {
trace_log.Error("Error in updating enumeration offset for target collection", zap.Error(err), zap.String("collectionId", targetCollectionID))
return
}
return
}

func (r *LogRepository) GetAllCollectionInfoToCompact(ctx context.Context, minCompactionSize uint64) (collectionToCompact []log.GetAllCollectionsToCompactRow, err error) {
collectionToCompact, err = r.queries.GetAllCollectionsToCompact(ctx, int64(minCompactionSize))
if collectionToCompact == nil {
Expand Down
26 changes: 25 additions & 1 deletion go/pkg/log/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *logServer) ScoutLogs(ctx context.Context, req *logservicepb.ScoutLogsRe
return
}
// +1 to convert from the (] bound to a [) bound.
res = &logservicepb.ScoutLogsResponse {
res = &logservicepb.ScoutLogsResponse{
FirstUninsertedRecordOffset: int64(limit + 1),
}
return
Expand Down Expand Up @@ -90,6 +90,30 @@ func (s *logServer) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequ
return
}

func (s *logServer) ForkLogs(ctx context.Context, req *logservicepb.ForkLogsRequest) (res *logservicepb.ForkLogsResponse, err error) {
var sourceCollectionID types.UniqueID
var targetCollectionID types.UniqueID
sourceCollectionID, err = types.ToUniqueID(&req.SourceCollectionId)
if err != nil {
return
}
targetCollectionID, err = types.ToUniqueID(&req.TargetCollectionId)
if err != nil {
return
}

compactionOffset, enumerationOffset, err := s.lr.ForkRecords(ctx, sourceCollectionID.String(), targetCollectionID.String())
if err != nil {
return
}

res = &logservicepb.ForkLogsResponse{
CompactionOffset: compactionOffset,
EnumerationOffset: enumerationOffset,
}
return
}

func (s *logServer) GetAllCollectionInfoToCompact(ctx context.Context, req *logservicepb.GetAllCollectionInfoToCompactRequest) (res *logservicepb.GetAllCollectionInfoToCompactResponse, err error) {
var collectionToCompact []log.GetAllCollectionsToCompactRow
collectionToCompact, err = s.lr.GetAllCollectionInfoToCompact(ctx, req.MinCompactionSize)
Expand Down
21 changes: 18 additions & 3 deletions go/pkg/log/store/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion go/pkg/log/store/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ FROM record_log r
WHERE r.collection_id = $1;

-- name: GetBoundsForCollection :one
SELECT record_compaction_offset_position, record_enumeration_offset_position
SELECT
COALESCE(record_compaction_offset_position, 0) AS record_compaction_offset_position,
COALESCE(record_enumeration_offset_position, 0) AS record_enumeration_offset_position
FROM collection
WHERE id = $1;

Expand All @@ -58,3 +60,9 @@ SELECT id FROM collection;

-- name: GetLastCompactedOffset :one
SELECT record_compaction_offset_position FROM collection c WHERE c.id = $1;

-- name: ForkCollectionRecord :exec
INSERT INTO record_log ("offset", collection_id, timestamp, record)
SELECT record_log.offset, $2, record_log.timestamp, record_log.record
FROM record_log
WHERE record_log.collection_id = $1;
24 changes: 13 additions & 11 deletions go/pkg/sysdb/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1416,11 +1416,9 @@ func (suite *APIsTestSuite) TestForkCollection() {
}

sourceFlushCollectionCompaction := &model.FlushCollectionCompaction{
ID: sourceCreateCollection.ID,
TenantID: sourceCreateCollection.TenantID,
// TODO: Inherit log position after log fork is implemented
// LogPosition: 1000,
LogPosition: 0,
ID: sourceCreateCollection.ID,
TenantID: sourceCreateCollection.TenantID,
LogPosition: 1000,
CurrentCollectionVersion: 0,
FlushSegmentCompactions: []*model.FlushSegmentCompaction{
sourceFlushMetadataSegment,
Expand All @@ -1437,9 +1435,11 @@ func (suite *APIsTestSuite) TestForkCollection() {

// Fork source collection
forkCollection := &model.ForkCollection{
SourceCollectionID: sourceCreateCollection.ID,
TargetCollectionID: types.NewUniqueID(),
TargetCollectionName: "test_fork_collection_fork_1",
SourceCollectionID: sourceCreateCollection.ID,
SourceCollectionLogCompactionOffset: 800,
SourceCollectionLogEnumerationOffset: 1200,
TargetCollectionID: types.NewUniqueID(),
TargetCollectionName: "test_fork_collection_fork_1",
}

collection, collection_segments, err := suite.coordinator.ForkCollection(ctx, forkCollection)
Expand Down Expand Up @@ -1469,9 +1469,11 @@ func (suite *APIsTestSuite) TestForkCollection() {

// Attempt to fork a collcetion with same name (should fail)
forkCollectionWithSameName := &model.ForkCollection{
SourceCollectionID: sourceCreateCollection.ID,
TargetCollectionID: types.NewUniqueID(),
TargetCollectionName: "test_fork_collection_source",
SourceCollectionID: sourceCreateCollection.ID,
SourceCollectionLogCompactionOffset: 800,
SourceCollectionLogEnumerationOffset: 1200,
TargetCollectionID: types.NewUniqueID(),
TargetCollectionName: "test_fork_collection_source",
}
_, _, err = suite.coordinator.ForkCollection(ctx, forkCollectionWithSameName)
suite.Error(err)
Expand Down
22 changes: 10 additions & 12 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,18 +902,16 @@ func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.For

// Create the new collection with source collection information
createCollection := &model.CreateCollection{
ID: forkCollection.TargetCollectionID,
Name: forkCollection.TargetCollectionName,
ConfigurationJsonStr: sourceCollection.ConfigurationJsonStr,
Dimension: sourceCollection.Dimension,
Metadata: sourceCollection.Metadata,
GetOrCreate: false,
TenantID: sourceCollection.TenantID,
DatabaseName: sourceCollection.DatabaseName,
Ts: ts.Unix(),
// TODO: Inherit log position after log fork is implemented
// LogPosition: sourceCollection.LogPosition,
LogPosition: 0,
ID: forkCollection.TargetCollectionID,
Name: forkCollection.TargetCollectionName,
ConfigurationJsonStr: sourceCollection.ConfigurationJsonStr,
Dimension: sourceCollection.Dimension,
Metadata: sourceCollection.Metadata,
GetOrCreate: false,
TenantID: sourceCollection.TenantID,
DatabaseName: sourceCollection.DatabaseName,
Ts: ts.Unix(),
LogPosition: sourceCollection.LogPosition,
RootCollectionId: rootCollectionIDStr,
TotalRecordsPostCompaction: sourceCollection.TotalRecordsPostCompaction,
SizeBytesPostCompaction: sourceCollection.SizeBytesPostCompaction,
Expand Down
11 changes: 11 additions & 0 deletions idl/chromadb/proto/logservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ message PullLogsResponse {
repeated LogRecord records = 1;
}

message ForkLogsRequest {
string source_collection_id = 1;
string target_collection_id = 2;
}

message ForkLogsResponse {
uint64 compaction_offset = 1;
uint64 enumeration_offset = 2;
}

message CollectionInfo {
string collection_id = 1;
// The log offset of the first log entry of the collection that needs to be compacted
Expand Down Expand Up @@ -81,6 +91,7 @@ service LogService {
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
rpc PullLogs(PullLogsRequest) returns (PullLogsResponse) {}
rpc ForkLogs(ForkLogsRequest) returns (ForkLogsResponse) {}
rpc GetAllCollectionInfoToCompact(GetAllCollectionInfoToCompactRequest) returns (GetAllCollectionInfoToCompactResponse) {}
rpc UpdateCollectionLogOffset(UpdateCollectionLogOffsetRequest) returns (UpdateCollectionLogOffsetResponse) {}
rpc PurgeDirtyForCollection(PurgeDirtyForCollectionRequest) returns (PurgeDirtyForCollectionResponse) {}
Expand Down
9 changes: 6 additions & 3 deletions rust/frontend/src/impls/service_based_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,16 @@ impl ServiceBasedFrontend {
}: ForkCollectionRequest,
) -> Result<ForkCollectionResponse, ForkCollectionError> {
let target_collection_id = CollectionUuid::new();
let log_offsets = self
.log_client
.fork_logs(source_collection_id, target_collection_id)
.await?;
let collection_and_segments = self
.sysdb_client
.fork_collection(
source_collection_id,
// TODO: Update this when wiring up log fork
0,
0,
log_offsets.compaction_offset,
log_offsets.enumeration_offset,
target_collection_id,
target_collection_name,
)
Expand Down
8 changes: 8 additions & 0 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use chroma_types::chroma_proto::{
PushLogsRequest, PushLogsResponse, ScoutLogsRequest, ScoutLogsResponse,
UpdateCollectionLogOffsetRequest, UpdateCollectionLogOffsetResponse,
};
use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse};
use chroma_types::CollectionUuid;
use figment::providers::{Env, Format, Yaml};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand Down Expand Up @@ -763,6 +764,13 @@ impl LogService for LogServer {
.await
}

async fn fork_logs(
&self,
_request: Request<ForkLogsRequest>,
) -> Result<Response<ForkLogsResponse>, Status> {
unimplemented!("Log forking is unimplemented for WAL3 for now")
}

#[tracing::instrument(info, skip(self, request), err(Display))]
async fn get_all_collection_info_to_compact(
&self,
Expand Down
44 changes: 43 additions & 1 deletion rust/log/src/grpc_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::chroma_proto::log_service_client::LogServiceClient;
use chroma_types::chroma_proto::{self};
use chroma_types::{CollectionUuid, LogRecord, OperationRecord, RecordConversionError};
use chroma_types::{
CollectionUuid, ForkLogsResponse, LogRecord, OperationRecord, RecordConversionError,
};
use std::fmt::Debug;
use std::time::Duration;
use thiserror::Error;
Expand Down Expand Up @@ -57,6 +59,23 @@ impl ChromaError for GrpcPushLogsError {
}
}

#[derive(Error, Debug)]
pub enum GrpcForkLogsError {
#[error("Please backoff exponentially and retry")]
Backoff,
#[error("Failed to push logs")]
FailedToForkLogs(#[from] tonic::Status),
}

impl ChromaError for GrpcForkLogsError {
fn code(&self) -> ErrorCodes {
match self {
GrpcForkLogsError::Backoff => ErrorCodes::Unavailable,
GrpcForkLogsError::FailedToForkLogs(_) => ErrorCodes::Internal,
}
}
}

#[derive(Error, Debug)]
pub enum GrpcGetCollectionsWithNewDataError {
#[error("Failed to fetch")]
Expand Down Expand Up @@ -306,6 +325,29 @@ impl GrpcLog {
Ok(())
}

pub(super) async fn fork_logs(
&mut self,
source_collection_id: CollectionUuid,
target_collection_id: CollectionUuid,
) -> Result<ForkLogsResponse, GrpcForkLogsError> {
let response = self
.client_for(source_collection_id)
.fork_logs(chroma_proto::ForkLogsRequest {
source_collection_id: source_collection_id.to_string(),
target_collection_id: target_collection_id.to_string(),
})
.await
.map_err(|err| match err.code() {
tonic::Code::Unavailable => GrpcForkLogsError::Backoff,
_ => err.into(),
})?
.into_inner();
Ok(ForkLogsResponse {
compaction_offset: response.compaction_offset,
enumeration_offset: response.enumeration_offset,
})
}

pub(crate) async fn get_collections_with_new_data(
&mut self,
min_compaction_size: u64,
Expand Down
Loading
Loading