@@ -555,7 +555,7 @@ func (tc *Catalog) hardDeleteCollection(ctx context.Context, deleteCollection *m
555555 return tc .txImpl .Transaction (ctx , func (txCtx context.Context ) error {
556556 collectionID := deleteCollection .ID
557557
558- collectionEntry , err := tc .metaDomain .CollectionDb (txCtx ).GetCollectionWithoutMetadata (types .FromUniqueID (collectionID ), & deleteCollection .DatabaseName , nil )
558+ collectionEntry , err := tc .metaDomain .CollectionDb (txCtx ).GetCollectionWithoutMetadata (types .FromUniqueID (collectionID ), & deleteCollection .DatabaseName , nil ) // todo: should filter for soft deleted
559559 if err != nil {
560560 return err
561561 }
@@ -564,6 +564,51 @@ func (tc *Catalog) hardDeleteCollection(ctx context.Context, deleteCollection *m
564564 return common .ErrCollectionDeleteNonExistingCollection
565565 }
566566
567+ if ! collectionEntry .IsDeleted {
568+ return common .ErrCollectionWasNotSoftDeleted
569+ }
570+
571+ if collectionEntry .RootCollectionId != nil {
572+ rootCollection , err := tc .metaDomain .CollectionDb (txCtx ).GetCollectionWithoutMetadata (collectionEntry .RootCollectionId , nil , nil )
573+ if err != nil {
574+ return err
575+ }
576+ if rootCollection == nil {
577+ return errors .New ("root collection not found" )
578+ }
579+
580+ if rootCollection .LineageFileName == nil {
581+ return errors .New ("lineage file name is nil on root collection" )
582+ }
583+
584+ lineageFile , err := tc .getLineageFile (txCtx , rootCollection .LineageFileName )
585+ if err != nil {
586+ return err
587+ }
588+ // filter out this collection
589+ updatedDependencies := make ([]* coordinatorpb.CollectionVersionDependency , 0 )
590+ for _ , dependency := range lineageFile .Dependencies {
591+ if dependency .TargetCollectionId != deleteCollection .ID .String () {
592+ updatedDependencies = append (updatedDependencies , dependency )
593+ }
594+ }
595+ lineageFile .Dependencies = updatedDependencies
596+
597+ newLineageFileId , err := uuid .NewV7 ()
598+ if err != nil {
599+ return err
600+ }
601+
602+ log .Info ("new lineage file id" , zap .String ("newLineageFileId" , newLineageFileId .String ()))
603+
604+ newLineageFileFullName , err := tc .s3Store .PutLineageFile (collectionEntry .Tenant , collectionEntry .DatabaseID , rootCollection .ID , fmt .Sprintf ("%s.binpb" , newLineageFileId .String ()), lineageFile )
605+ if err != nil {
606+ return err
607+ }
608+
609+ tc .metaDomain .CollectionDb (txCtx ).UpdateCollectionLineageFilePath (rootCollection .ID , rootCollection .LineageFileName , newLineageFileFullName )
610+ }
611+
567612 // Delete collection and collection metadata.
568613 collectionDeletedCount , err := tc .metaDomain .CollectionDb (txCtx ).DeleteCollectionByID (collectionID .String ())
569614 if err != nil {
@@ -1960,7 +2005,7 @@ func (tc *Catalog) MarkVersionForDeletion(ctx context.Context, req *coordinatorp
19602005
19612006func (tc * Catalog ) updateProtoRemoveVersionEntries (versionFilePb * coordinatorpb.CollectionVersionFile , versions []int64 ) error {
19622007 // Check if version history exists
1963- if versionFilePb .GetVersionHistory () == nil || len ( versionFilePb . GetVersionHistory (). Versions ) == 0 {
2008+ if versionFilePb .GetVersionHistory () == nil {
19642009 log .Error ("version history not found" )
19652010 return errors .New ("version history not found" )
19662011 }
@@ -1996,14 +2041,14 @@ func (tc *Catalog) getNumberOfActiveVersions(versionFilePb *coordinatorpb.Collec
19962041 return len (activeVersions )
19972042}
19982043
1999- func (tc * Catalog ) getOldestVersionTs (versionFilePb * coordinatorpb.CollectionVersionFile ) time.Time {
2044+ func (tc * Catalog ) getOldestVersionTs (versionFilePb * coordinatorpb.CollectionVersionFile ) * time.Time {
20002045 if versionFilePb .GetVersionHistory () == nil || len (versionFilePb .GetVersionHistory ().Versions ) == 0 {
2001- // Returning a zero timestamp that represents an unset value.
2002- return time.Time {}
2046+ return nil
20032047 }
20042048 oldestVersionTs := versionFilePb .GetVersionHistory ().Versions [0 ].CreatedAtSecs
20052049
2006- return time .Unix (oldestVersionTs , 0 )
2050+ ts := time .Unix (oldestVersionTs , 0 )
2051+ return & ts
20072052}
20082053
20092054func (tc * Catalog ) DeleteVersionEntriesForCollection (ctx context.Context , tenantID string , collectionID string , versions []int64 ) error {
@@ -2017,8 +2062,8 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20172062
20182063 // Read the existing version file
20192064 collectionIDPtr := & collectionID
2020- isDeleted := false
2021- collectionEntry , err := tc .metaDomain .CollectionDb (ctx ).GetCollectionWithoutMetadata (collectionIDPtr , nil , & isDeleted )
2065+ // isDeleted := true // todo
2066+ collectionEntry , err := tc .metaDomain .CollectionDb (ctx ).GetCollectionWithoutMetadata (collectionIDPtr , nil , nil ) // todo
20222067 if err != nil {
20232068 return err
20242069 }
@@ -2038,14 +2083,19 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20382083 }
20392084
20402085 numActiveVersions := tc .getNumberOfActiveVersions (versionFilePb )
2041- if numActiveVersions < 1 {
2086+ if numActiveVersions < 1 && ! collectionEntry . IsDeleted {
20422087 // No remaining valid versions after GC.
20432088 return errors .New ("no valid versions after gc" )
20442089 }
20452090
20462091 // Get the creation time of the oldest version.
20472092 oldestVersionTs := tc .getOldestVersionTs (versionFilePb )
2048- if oldestVersionTs .IsZero () {
2093+ if oldestVersionTs == nil {
2094+ if ! collectionEntry .IsDeleted {
2095+ // todo
2096+ return errors .New ("oldest version timestamp is nil after GC" )
2097+ }
2098+ } else if oldestVersionTs .IsZero () {
20492099 // This should never happen.
20502100 log .Error ("oldest version timestamp is zero after GC." , zap .String ("collection_id" , collectionID ))
20512101 // No versions to delete.
@@ -2065,7 +2115,7 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
20652115 }
20662116
20672117 // Update the version file name in Postgres table as a CAS operation
2068- rowsAffected , err := tc .metaDomain .CollectionDb (ctx ).UpdateVersionRelatedFields (collectionID , existingVersionFileName , newVerFileFullPath , & oldestVersionTs , & numActiveVersions )
2118+ rowsAffected , err := tc .metaDomain .CollectionDb (ctx ).UpdateVersionRelatedFields (collectionID , existingVersionFileName , newVerFileFullPath , oldestVersionTs , & numActiveVersions )
20692119 if err != nil {
20702120 // Delete the newly created version file from S3 since it is not needed
20712121 tc .s3Store .DeleteVersionFile (tenantID , collectionEntry .DatabaseID , collectionID , newVersionFileName )
@@ -2113,6 +2163,20 @@ func (tc *Catalog) BatchGetCollectionVersionFilePaths(ctx context.Context, colle
21132163 return & result , nil
21142164}
21152165
2166+ func (tc * Catalog ) BatchGetCollectionSoftDeleteStatus (ctx context.Context , collectionIds []string ) (* coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse , error ) {
2167+ result := coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse {
2168+ CollectionIdToIsSoftDeleted : make (map [string ]bool ),
2169+ }
2170+
2171+ status , err := tc .metaDomain .CollectionDb (ctx ).BatchGetCollectionSoftDeleteStatus (collectionIds )
2172+ if err != nil {
2173+ return nil , err
2174+ }
2175+ result .CollectionIdToIsSoftDeleted = status
2176+
2177+ return & result , nil
2178+ }
2179+
21162180func (tc * Catalog ) GetVersionFileNamesForCollection (ctx context.Context , tenantID string , collectionID string ) (string , error ) {
21172181 collectionIDPtr := & collectionID
21182182 isDeleted := false
0 commit comments