Skip to content

[ENH] Wire up proto defs for sysdb fork endpoint #4299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (s *Coordinator) UpdateCollection(ctx context.Context, collection *model.Up
return s.catalog.UpdateCollection(ctx, collection, collection.Ts)
}

func (s *Coordinator) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
return s.catalog.ForkCollection(ctx, forkCollection)
}

func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.CreateSegment) error {
if err := verifyCreateSegment(segment); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions go/pkg/sysdb/coordinator/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type UpdateCollection struct {
Ts types.Timestamp
}

type ForkCollection struct {
SourceCollectionID types.UniqueID
TargetCollectionName *string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this *string instead of string? Can it ever be NULL?

}

type FlushCollectionCompaction struct {
ID types.UniqueID
TenantID string
Expand Down
19 changes: 19 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,25 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model
return result, nil
}

func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
log.Info("Forking collection", zap.String("sourceCollectionId", forkCollection.SourceCollectionID.String()), zap.String("targetCollectionName", *forkCollection.TargetCollectionName))

var source_collection *model.Collection
var source_segments []*model.Segment

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
var err error
source_collection, source_segments, err = tc.GetCollectionWithSegments(ctx, forkCollection.SourceCollectionID)
return err
})
if err != nil {
return nil, nil, err
}

// TODO: Implement forking logic
return source_collection, source_segments, nil
}

func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
var result *model.Segment

Expand Down
59 changes: 40 additions & 19 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"encoding/json"
"fmt"

"github.com/chroma-core/chroma/go/pkg/grpcutils"

Expand Down Expand Up @@ -250,31 +249,15 @@ func (s *Server) GetCollectionWithSegments(ctx context.Context, req *coordinator
}
return res, grpcutils.BuildInternalGrpcError(err.Error())
}

res.Collection = convertCollectionToProto(collection)

segmentpbList := make([]*coordinatorpb.Segment, 0, len(segments))
scopeToSegmentMap := map[coordinatorpb.SegmentScope]*coordinatorpb.Segment{}
for _, segment := range segments {
segmentpb := convertSegmentToProto(segment)
scopeToSegmentMap[segmentpb.GetScope()] = segmentpb
segmentpbList = append(segmentpbList, segmentpb)
}

if len(segmentpbList) != 3 {
log.Error("GetCollectionWithSegments failed. Unexpected number of collection segments", zap.String("collection_id", collectionID))
return res, grpcutils.BuildInternalGrpcError(fmt.Sprintf("Unexpected number of segments for collection %s: %d", collectionID, len(segmentpbList)))
}

scopes := []coordinatorpb.SegmentScope{coordinatorpb.SegmentScope_METADATA, coordinatorpb.SegmentScope_RECORD, coordinatorpb.SegmentScope_VECTOR}

for _, scope := range scopes {
if _, exists := scopeToSegmentMap[scope]; !exists {
log.Error("GetCollectionWithSegments failed. Collection segment scope not found", zap.String("collection_id", collectionID), zap.String("missing_scope", scope.String()))
return res, grpcutils.BuildInternalGrpcError(fmt.Sprintf("Missing segment scope for collection %s: %s", collectionID, scope.String()))
}
}

Comment on lines -263 to -276
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is removed because the rust frontend performs the same check after receiving the response

res.Segments = segmentpbList

return res, nil
}

Expand Down Expand Up @@ -360,6 +343,44 @@ func (s *Server) UpdateCollection(ctx context.Context, req *coordinatorpb.Update
return res, nil
}

func (s *Server) ForkCollection(ctx context.Context, req *coordinatorpb.ForkCollectionRequest) (*coordinatorpb.ForkCollectionResponse, error) {
collectionID := req.SourceCollectionId

res := &coordinatorpb.ForkCollectionResponse{}

parsedCollectionID, err := types.ToUniqueID(&collectionID)
if err != nil {
log.Error("ForkCollection failed. Failed to parse source collection id", zap.Error(err), zap.String("collection_id", collectionID))
return res, grpcutils.BuildInternalGrpcError(err.Error())
}

forkCollection := &model.ForkCollection{
SourceCollectionID: parsedCollectionID,
TargetCollectionName: &req.TargetCollectionName,
}
collection, segments, err := s.coordinator.ForkCollection(ctx, forkCollection)
if err != nil {
log.Error("ForkCollection failed. ", zap.Error(err), zap.String("collection_id", collectionID))
if err == common.ErrCollectionNotFound {
return res, grpcutils.BuildNotFoundGrpcError(err.Error())
}
if err == common.ErrCollectionUniqueConstraintViolation {
return res, grpcutils.BuildAlreadyExistsGrpcError(err.Error())
}
return res, grpcutils.BuildInternalGrpcError(err.Error())
}
res.Collection = convertCollectionToProto(collection)

segmentpbList := make([]*coordinatorpb.Segment, 0, len(segments))
for _, segment := range segments {
segmentpb := convertSegmentToProto(segment)
segmentpbList = append(segmentpbList, segmentpb)
}
res.Segments = segmentpbList

return res, nil
}

func (s *Server) ListCollectionVersions(ctx context.Context, req *coordinatorpb.ListCollectionVersionsRequest) (*coordinatorpb.ListCollectionVersionsResponse, error) {
collectionID, err := types.ToUniqueID(&req.CollectionId)
if err != nil {
Expand Down
43 changes: 22 additions & 21 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ message UpdateCollectionResponse {
reserved "status";
}

message ForkCollectionRequest {
string source_collection_id = 1;
string target_collection_id = 2;
string target_collection_name = 3;
}

message ForkCollectionResponse {
Collection collection = 1;
repeated Segment segments = 2;
}

message ResetStateResponse {
reserved 1;
reserved "status";
Expand Down Expand Up @@ -333,34 +344,13 @@ message CollectionSegmentInfo {
// GC's {collection,version} selection policy.
}

// Tuple of collection ID, tenant ID, and version.
message CollectionVersionTuple {
string collection_id = 1;
string tenant_id = 2;
int64 version = 3;
}

message VersionListForCollection {
string tenant_id = 1;
string database_id = 2;
string collection_id = 3;
repeated int64 versions = 4;
}

// Contains information about the lineage of a collection.
message CollectionLineageInfo {
// ID of the collection.
string collection_id = 1;
// ID of the tenant.
string tenant_id = 2;
// Whether the collection is a root collection.
bool is_root_collection = 3;
// An ordered list of descendant collections.
// The first element is the root {collection, version}, and the last
// element is the direct parent of the current collection.
repeated CollectionVersionTuple parent_collections = 4;
}

// Request to list versions of a collection.
message ListCollectionVersionsRequest {
string collection_id = 1;
Expand All @@ -382,6 +372,16 @@ message ListCollectionVersionsResponse {
bool list_is_truncated = 2;
}

message CollectionLineageFile {
repeated CollectionVersionDependency dependencies = 1;
}

message CollectionVersionDependency {
string source_collection_id = 1; // The forked collection
uint64 source_collection_version = 2; // The forked collection version
string target_collection_id = 3; // The forking collection
}

// Request to restore a collection.
message RestoreCollectionRequest {
string collection_id = 1;
Expand Down Expand Up @@ -473,6 +473,7 @@ service SysDB {
rpc GetCollectionWithSegments(GetCollectionWithSegmentsRequest) returns (GetCollectionWithSegmentsResponse) {}
rpc CheckCollections(CheckCollectionsRequest) returns (CheckCollectionsResponse) {}
rpc UpdateCollection(UpdateCollectionRequest) returns (UpdateCollectionResponse) {}
rpc ForkCollection(ForkCollectionRequest) returns (ForkCollectionResponse) {}
rpc ResetState(google.protobuf.Empty) returns (ResetStateResponse) {}
rpc GetLastCompactionTimeForTenant(GetLastCompactionTimeForTenantRequest) returns (GetLastCompactionTimeForTenantResponse) {}
rpc SetLastCompactionTimeForTenant(SetLastCompactionTimeForTenantRequest) returns (google.protobuf.Empty) {}
Expand Down
Loading