@@ -53,15 +53,16 @@ type trigger interface {
5353}
5454
5555type compactionSignal struct {
56- id UniqueID
57- isForce bool
58- collectionID UniqueID
59- partitionID UniqueID
60- channel string
61- segmentIDs []UniqueID
62- pos * msgpb.MsgPosition
63- resultCh chan error
64- waitResult bool
56+ id UniqueID
57+ isForce bool
58+ collectionID UniqueID
59+ partitionID UniqueID
60+ channel string
61+ segmentIDs []UniqueID
62+ pos * msgpb.MsgPosition
63+ resultCh chan error
64+ waitResult bool
65+ doStrictExpiryCompaction bool
6566}
6667
6768func NewCompactionSignal () * compactionSignal {
@@ -133,6 +134,8 @@ type compactionTrigger struct {
133134 // A sloopy hack, so we can test with different segment row count without worrying that
134135 // they are re-calculated in every compaction.
135136 testingOnly bool
137+ // no need to use mutex for this map, as all operations towards should be executed serially
138+ lastStrictExpiryCompactionTsMap map [CompactionGroupLabel ]time.Time
136139}
137140
138141func newCompactionTrigger (
@@ -143,16 +146,17 @@ func newCompactionTrigger(
143146 indexVersionManager IndexEngineVersionManager ,
144147) * compactionTrigger {
145148 return & compactionTrigger {
146- meta : meta ,
147- allocator : allocator ,
148- signals : make (chan * compactionSignal , 100 ),
149- manualSignals : make (chan * compactionSignal , 100 ),
150- compactionHandler : compactionHandler ,
151- indexEngineVersionManager : indexVersionManager ,
152- estimateDiskSegmentPolicy : calBySchemaPolicyWithDiskIndex ,
153- estimateNonDiskSegmentPolicy : calBySchemaPolicy ,
154- handler : handler ,
155- closeCh : lifetime .NewSafeChan (),
149+ meta : meta ,
150+ allocator : allocator ,
151+ signals : make (chan * compactionSignal , 100 ),
152+ manualSignals : make (chan * compactionSignal , 100 ),
153+ compactionHandler : compactionHandler ,
154+ indexEngineVersionManager : indexVersionManager ,
155+ estimateDiskSegmentPolicy : calBySchemaPolicyWithDiskIndex ,
156+ estimateNonDiskSegmentPolicy : calBySchemaPolicy ,
157+ handler : handler ,
158+ closeCh : lifetime .NewSafeChan (),
159+ lastStrictExpiryCompactionTsMap : make (map [CompactionGroupLabel ]time.Time , 0 ),
156160 }
157161}
158162
@@ -321,6 +325,24 @@ func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error)
321325 return t .allocator .AllocID (ctx )
322326}
323327
328+ func (t * compactionTrigger ) shouldDoStrictExpiryCompaction (group * chanPartSegments ) bool {
329+ if paramtable .Get ().DataCoordCfg .CompactionForceExpiryInterval .GetAsInt () > 0 {
330+ cate := CompactionGroupLabel {group .collectionID , group .partitionID , group .channelName }
331+ lastExpiryCompactionTime , ok := t .lastStrictExpiryCompactionTsMap [cate ]
332+ if ! ok || time .Since (lastExpiryCompactionTime ) >= paramtable .Get ().DataCoordCfg .CompactionForceExpiryInterval .GetAsDuration (time .Hour ) {
333+ return true
334+ }
335+ }
336+ return false
337+ }
338+
339+ func (t * compactionTrigger ) mayUpdateStrictExpiryCompactionTs (signal * compactionSignal , plansSubmitted bool ) {
340+ if paramtable .Get ().DataCoordCfg .CompactionForceExpiryInterval .GetAsInt () > 0 && signal .doStrictExpiryCompaction && plansSubmitted {
341+ cate := CompactionGroupLabel {signal .collectionID , signal .partitionID , signal .channel }
342+ t .lastStrictExpiryCompactionTsMap [cate ] = time .Now ()
343+ }
344+ }
345+
324346// handleSignal is the internal logic to convert compactionSignal into compaction tasks.
325347func (t * compactionTrigger ) handleSignal (signal * compactionSignal ) error {
326348 log := log .With (zap .Int64 ("compactionID" , signal .id ),
@@ -375,10 +397,13 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
375397 }
376398
377399 expectedSize := getExpectedSegmentSize (t .meta , coll )
400+ signal .doStrictExpiryCompaction = t .shouldDoStrictExpiryCompaction (& group )
378401 plans := t .generatePlans (group .segments , signal , ct , expectedSize )
402+ plansSubmitted := true
379403 for _ , plan := range plans {
380404 if ! signal .isForce && t .compactionHandler .isFull () {
381405 log .Warn ("compaction plan skipped due to handler full" )
406+ plansSubmitted = false
382407 break
383408 }
384409 totalRows , inputSegmentIDs := plan .A , plan .B
@@ -422,13 +447,15 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
422447 zap .Int64 ("planID" , task .GetPlanID ()),
423448 zap .Int64s ("inputSegments" , inputSegmentIDs ),
424449 zap .Error (err ))
450+ plansSubmitted = false
425451 continue
426452 }
427453
428454 log .Info ("time cost of generating global compaction" ,
429455 zap .Int64 ("time cost" , time .Since (start ).Milliseconds ()),
430456 zap .Int64s ("segmentIDs" , inputSegmentIDs ))
431457 }
458+ t .mayUpdateStrictExpiryCompactionTs (signal , plansSubmitted )
432459 }
433460 return nil
434461}
@@ -449,7 +476,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
449476 for _ , segment := range segments {
450477 segment := segment .ShadowClone ()
451478 // TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
452- if signal .isForce || t .ShouldDoSingleCompaction (segment , compactTime ) {
479+ if signal .isForce || t .ShouldDoSingleCompaction (segment , compactTime , signal ) {
453480 prioritizedCandidates = append (prioritizedCandidates , segment )
454481 } else if t .isSmallSegment (segment , expectedSize ) {
455482 smallCandidates = append (smallCandidates , segment )
@@ -587,25 +614,19 @@ func (t *compactionTrigger) getCandidates(signal *compactionSignal) ([]chanPartS
587614 if len (signal .segmentIDs ) > 0 && len (segments ) != len (signal .segmentIDs ) {
588615 return nil , merr .WrapErrServiceInternal ("not all segment ids provided could be compacted" )
589616 }
590-
591- type category struct {
592- collectionID int64
593- partitionID int64
594- channelName string
595- }
596- groups := lo .GroupBy (segments , func (segment * SegmentInfo ) category {
597- return category {
598- collectionID : segment .CollectionID ,
599- partitionID : segment .PartitionID ,
600- channelName : segment .InsertChannel ,
617+ groups := lo .GroupBy (segments , func (segment * SegmentInfo ) CompactionGroupLabel {
618+ return CompactionGroupLabel {
619+ CollectionID : segment .CollectionID ,
620+ PartitionID : segment .PartitionID ,
621+ Channel : segment .InsertChannel ,
601622 }
602623 })
603624
604- return lo .MapToSlice (groups , func (c category , segments []* SegmentInfo ) chanPartSegments {
625+ return lo .MapToSlice (groups , func (c CompactionGroupLabel , segments []* SegmentInfo ) chanPartSegments {
605626 return chanPartSegments {
606- collectionID : c .collectionID ,
607- partitionID : c .partitionID ,
608- channelName : c .channelName ,
627+ collectionID : c .CollectionID ,
628+ partitionID : c .PartitionID ,
629+ channelName : c .Channel ,
609630 segments : segments ,
610631 }
611632 }), nil
@@ -659,7 +680,20 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool {
659680 return is
660681}
661682
662- func (t * compactionTrigger ) ShouldDoSingleCompaction (segment * SegmentInfo , compactTime * compactTime ) bool {
683+ func (t * compactionTrigger ) ShouldStrictCompactExpiry (fromTs uint64 , compactTime * compactTime , signal * compactionSignal , segID int64 ) bool {
684+ if signal != nil && signal .doStrictExpiryCompaction && fromTs <= compactTime .expireTime {
685+ log .Info ("Trigger strict expiry compaction for segment" ,
686+ zap .Int64 ("segmentID" , segID ),
687+ zap .Int64 ("collectionID" , signal .collectionID ),
688+ zap .Int64 ("partition" , signal .partitionID ),
689+ zap .String ("channel" , signal .channel ),
690+ )
691+ return true
692+ }
693+ return false
694+ }
695+
696+ func (t * compactionTrigger ) ShouldDoSingleCompaction (segment * SegmentInfo , compactTime * compactTime , signal * compactionSignal ) bool {
663697 // no longer restricted binlog numbers because this is now related to field numbers
664698
665699 log := log .Ctx (context .TODO ())
@@ -673,6 +707,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
673707 // if expire time is enabled, put segment into compaction candidate
674708 totalExpiredSize := int64 (0 )
675709 totalExpiredRows := 0
710+ var earliestFromTs uint64 = math .MaxUint64
676711 for _ , binlogs := range segment .GetBinlogs () {
677712 for _ , l := range binlogs .GetBinlogs () {
678713 // TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
@@ -685,9 +720,14 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
685720 totalExpiredRows += int (l .GetEntriesNum ())
686721 totalExpiredSize += l .GetMemorySize ()
687722 }
723+ earliestFromTs = min (earliestFromTs , l .TimestampFrom )
688724 }
689725 }
690726
727+ if t .ShouldStrictCompactExpiry (earliestFromTs , compactTime , signal , segment .GetID ()) {
728+ return true
729+ }
730+
691731 if float64 (totalExpiredRows )/ float64 (segment .GetNumOfRows ()) >= Params .DataCoordCfg .SingleCompactionRatioThreshold .GetAsFloat () ||
692732 totalExpiredSize > Params .DataCoordCfg .SingleCompactionExpiredLogMaxSize .GetAsInt64 () {
693733 log .Info ("total expired entities is too much, trigger compaction" , zap .Int64 ("segmentID" , segment .ID ),
0 commit comments