Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions internal/datacoord/compaction_policy_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v3/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v3/schemapb"
Expand Down Expand Up @@ -420,7 +419,7 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
{"no partition stats and not enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{}, false},
{"no partition stats and enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{
{
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
SegmentInfo: &datapb.SegmentInfo{Stats: &datapb.Statistics{InsertBinlogSize: 1024 * 1024 * 1024 * 10}},
},
}, true},
{
Expand All @@ -437,7 +436,7 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
100,
[]*SegmentInfo{
{
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
SegmentInfo: &datapb.SegmentInfo{Stats: &datapb.Statistics{InsertBinlogSize: 1024 * 1024 * 1024 * 10}},
},
},
false,
Expand All @@ -456,7 +455,7 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
100,
[]*SegmentInfo{
{
size: *atomic.NewInt64(1024),
SegmentInfo: &datapb.SegmentInfo{Stats: &datapb.Statistics{InsertBinlogSize: 1024}},
},
},
true,
Expand All @@ -476,8 +475,7 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
100,
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{ID: 9999},
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
SegmentInfo: &datapb.SegmentInfo{ID: 9999, Stats: &datapb.Statistics{InsertBinlogSize: 1024 * 1024 * 1024 * 10}},
},
},
true,
Expand All @@ -497,8 +495,7 @@ func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
100,
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{ID: 9999},
size: *atomic.NewInt64(1024),
SegmentInfo: &datapb.SegmentInfo{ID: 9999, Stats: &datapb.Statistics{InsertBinlogSize: 1024}},
},
},
false,
Expand Down
93 changes: 57 additions & 36 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,16 +635,10 @@ func isExpandableSmallSegment(segment *SegmentInfo, expectedSize int64) bool {
}

func hasTooManyDeletions(segment *SegmentInfo) bool {
deltaLogCount := 0
totalDeletedRows := 0
totalDeleteLogSize := int64(0)
for _, deltaLogs := range segment.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
totalDeletedRows += int(l.GetEntriesNum())
totalDeleteLogSize += l.GetMemorySize()
}
deltaLogCount += len(deltaLogs.GetBinlogs())
}
stats := segment.EnsureStats()
deltaLogCount := int(stats.GetDeltaBinlogCount())
totalDeletedRows := int(stats.GetDeleteNumRows())
totalDeleteLogSize := stats.GetDeltaBinlogSize()

// Too many deltalog files, accumulates IO count.
if deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() {
Expand Down Expand Up @@ -740,36 +734,63 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
// no longer restricted binlog numbers because this is now related to field numbers
log := log.Ctx(context.TODO())

// if expire time is enabled, put segment into compaction candidate
totalExpiredSize := int64(0)
totalExpiredRows := 0
var earliestFromTs uint64 = math.MaxUint64
for _, binlogs := range segment.GetBinlogs() {
for _, l := range binlogs.GetBinlogs() {
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
// For import segments, row timestamps predate the commit; use commit_timestamp
// as the effective "data age" to prevent premature TTL-triggered compaction.
if tsoutil.EffectiveTimestamp(l.TimestampTo, segment.GetCommitTimestamp()) < compactTime.expireTime {
log.RatedDebug(10, "mark binlog as expired",
zap.Int64("segmentID", segment.ID),
zap.Int64("binlogID", l.GetLogID()),
zap.Uint64("binlogTimestampTo", l.TimestampTo),
zap.Uint64("compactExpireTime", compactTime.expireTime))
totalExpiredRows += int(l.GetEntriesNum())
totalExpiredSize += l.GetMemorySize()
}
earliestFromTs = min(earliestFromTs, tsoutil.EffectiveTimestamp(l.TimestampFrom, segment.GetCommitTimestamp()))
}
}
stats := segment.EnsureStats()
commitTs := segment.GetCommitTimestamp()

// Strict-tolerance path: exact min via Stats.TimestampFrom. For import
// segments commit_timestamp overrides every row's effective timestamp.
earliestFromTs := tsoutil.EffectiveTimestamp(stats.GetTimestampFrom(), commitTs)
if t.ShouldCompactExpiry(earliestFromTs, compactTime, segment) {
return true
}

if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() ||
totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {
log.Info("total expired entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),
zap.Int("expiredRows", totalExpiredRows), zap.Int64("expiredLogSize", totalExpiredSize),
zap.Bool("createdByCompaction", segment.CreatedByCompaction), zap.Int64s("compactionFrom", segment.CompactionFrom))
// Ratio + size path: derive an expired-row fraction from the quantile
// distribution (20%-bucket granularity). Approximate; the strict-
// tolerance check above covers the precise edges.
//
// We deliberately UNDER-estimate. Q[i] < expireTime guarantees
// percentiles[i] of rows are expired; that fraction times
// InsertBinlogSize is the byte estimate under a uniform-per-row-size
// assumption that does NOT hold when expired binlogs are smaller than
// the segment-wide average. To prevent over-triggering on segments
// whose precise expired-byte sum sits exactly at threshold, we shift
// the fraction down one 20% bucket.
ratio := Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat()
expiredFraction := 0.0
if commitTs > 0 {
if commitTs < compactTime.expireTime {
expiredFraction = 1.0
}
} else {
percentiles := []float64{0.2, 0.4, 0.6, 0.8, 1.0}
qualifyingIdx := -1
// Walk at most len(percentiles) entries — Statistics.timestamp_quantiles
// is fixed-length-5 by contract, but cap defensively so an unexpected
// longer array can't index out of percentiles.
quantiles := stats.GetTimestampQuantiles()
if len(quantiles) > len(percentiles) {
quantiles = quantiles[:len(percentiles)]
}
for i, q := range quantiles {
if q > 0 && uint64(q) < compactTime.expireTime {
qualifyingIdx = i
continue
}
break
}
if qualifyingIdx >= 1 {
expiredFraction = percentiles[qualifyingIdx-1]
}
}
expiredApproxSize := int64(expiredFraction * float64(stats.GetInsertBinlogSize()))
if expiredFraction >= ratio ||
expiredApproxSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {
log.Info("expired entities exceed ratio/size threshold, trigger compaction",
zap.Int64("segmentID", segment.ID),
zap.Float64("expiredFraction", expiredFraction),
zap.Int64("approxExpiredSize", expiredApproxSize),
zap.Bool("createdByCompaction", segment.CreatedByCompaction),
zap.Int64s("compactionFrom", segment.CompactionFrom))
return true
}

Expand Down
18 changes: 12 additions & 6 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -1781,9 +1780,16 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 300})
assert.False(t, couldDo)

// didn't reach single compaction size 10 * 1024 * 1024
// Opportunistic quantile-based ratio/size check has 20% bucket
// granularity (vs. the legacy per-binlog computation). At
// expireTime=600 all five quantiles are 500 < 600, so qualifyingIdx=4;
// the under-estimate rule drops one bucket → expiredFraction = 0.8.
// approxExpiredSize = 0.8 × InsertBinlogSize comfortably exceeds the
// 10 MiB threshold and compaction triggers. (Legacy precise per-
// binlog sum would have been 100 × 100 KiB = 10 MiB, exactly at
// threshold; the approximate path now sits above it.)
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 600})
assert.False(t, couldDo)
assert.True(t, couldDo)

// expire time < Timestamp False
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 1200})
Expand Down Expand Up @@ -2696,11 +2702,11 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
func (s *CompactionTriggerSuite) TestSqueezeSmallSegments() {
expectedSize := int64(70000)
smallsegments := []*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{ID: 3}, size: *atomic.NewInt64(69999)},
{SegmentInfo: &datapb.SegmentInfo{ID: 1}, size: *atomic.NewInt64(100)},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, Stats: &datapb.Statistics{InsertBinlogSize: 69999}}},
{SegmentInfo: &datapb.SegmentInfo{ID: 1, Stats: &datapb.Statistics{InsertBinlogSize: 100}}},
}

largeSegment := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 2}, size: *atomic.NewInt64(expectedSize)}
largeSegment := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 2, Stats: &datapb.Statistics{InsertBinlogSize: expectedSize}}}
buckets := [][]*SegmentInfo{{largeSegment}}
s.Require().Equal(1, len(buckets))
s.Require().Equal(1, len(buckets[0]))
Expand Down
24 changes: 12 additions & 12 deletions internal/datacoord/compaction_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,25 @@ import (
// The schema is used to calculate a minimum ID count for V3 manifest segments where
// binlog metadata may be empty but stats output still requires IDs.
func PreAllocateBinlogIDs(allocator allocator.Allocator, segmentInfos []*SegmentInfo, schema *schemapb.CollectionSchema) (*datapb.IDRange, error) {
binlogNum := 0
binlogNum := int64(0)
for _, s := range segmentInfos {
for _, l := range s.GetBinlogs() {
binlogNum += len(l.GetBinlogs())
}
for _, l := range s.GetDeltalogs() {
binlogNum += len(l.GetBinlogs())
}
// Insert / delta counts come from Statistics. Stats and BM25
// stats arrays are still iterated because Statistics doesn't
// carry a per-segment stat-file count today (V3 leaves these
// arrays empty regardless; V2 sees the cumulative arrays here).
stats := s.EnsureStats()
binlogNum += stats.GetInsertBinlogCount() + stats.GetDeltaBinlogCount()
for _, l := range s.GetStatslogs() {
binlogNum += len(l.GetBinlogs())
binlogNum += int64(len(l.GetBinlogs()))
}
for _, l := range s.GetBm25Statslogs() {
binlogNum += len(l.GetBinlogs())
binlogNum += int64(len(l.GetBinlogs()))
}
}
// Compaction output always needs IDs for PK stats (1) and BM25 stats (per BM25 function).
// For V3 manifest segments, binlog metadata may be empty since data is managed by manifest,
// but stats output still requires IDs. Calculate the minimum from schema directly.
minIDsFromSchema := 1 // 1 for PK stats
minIDsFromSchema := int64(1) // 1 for PK stats
if schema != nil {
for _, fn := range schema.GetFunctions() {
if fn.GetType() == schemapb.FunctionType_BM25 {
Expand All @@ -63,8 +63,8 @@ func PreAllocateBinlogIDs(allocator allocator.Allocator, segmentInfos []*Segment
if binlogNum < minIDsFromSchema {
binlogNum = minIDsFromSchema
}
n := binlogNum * paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt()
begin, end, err := allocator.AllocN(int64(n))
n := binlogNum * int64(paramtable.Get().DataCoordCfg.CompactionPreAllocateIDExpansionFactor.GetAsInt())
begin, end, err := allocator.AllocN(n)
return &datapb.IDRange{Begin: begin, End: end}, err
}

Expand Down
22 changes: 15 additions & 7 deletions internal/datacoord/compaction_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v3/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v3/msgpb"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/v3/proto/datapb"
)

Expand Down Expand Up @@ -137,9 +136,14 @@ func (s *SegmentView) Clone() *SegmentView {

func GetViewsByInfo(segments ...*SegmentInfo) []*SegmentView {
return lo.Map(segments, func(segment *SegmentInfo, _ int) *SegmentView {
stats := segment.EnsureStats()
numOfRows := segment.GetNumOfRows()
if segment.GetLevel() == datapb.SegmentLevel_L0 {
numOfRows = segmentutil.CalcDelRowCountFromDeltaLog(segment.SegmentInfo)
// L0 segments record deleted-row count under numOfRows for view
// purposes (no inserts). DeleteNumRows on Statistics is the
// persisted equivalent of the legacy CalcDelRowCountFromDeltaLog
// iteration.
numOfRows = stats.GetDeleteNumRows()
}
return &SegmentView{
ID: segment.ID,
Expand All @@ -156,12 +160,16 @@ func GetViewsByInfo(segments ...*SegmentInfo) []*SegmentView {
startPos: segment.GetStartPosition(),
dmlPos: segment.GetDmlPosition(),

DeltaSize: GetBinlogSizeAsBytes(segment.GetDeltalogs()),
DeltalogCount: GetBinlogCount(segment.GetDeltalogs()),
DeltaRowCount: GetBinlogEntriesNum(segment.GetDeltalogs()),
// Aggregate metrics come from Statistics. StatslogCount stays on
// the array path because Statistics has no per-segment stat-file
// count; V3 segments' empty statslogs read as 0, which matches
// the manifest-driven layout.
DeltaSize: float64(stats.GetDeltaBinlogSize()),
DeltalogCount: int(stats.GetDeltaBinlogCount()),
DeltaRowCount: int(stats.GetDeleteNumRows()),

Size: GetBinlogSizeAsBytes(segment.GetBinlogs()),
BinlogCount: GetBinlogCount(segment.GetBinlogs()),
Size: float64(stats.GetInsertBinlogSize()),
BinlogCount: int(stats.GetInsertBinlogCount()),
StatslogCount: GetBinlogCount(segment.GetStatslogs()),

NumOfRows: numOfRows,
Expand Down
31 changes: 8 additions & 23 deletions internal/datacoord/import_task_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package datacoord

import (
"context"
"math"
"strconv"
"time"

Expand All @@ -30,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/v3/log"
"github.com/milvus-io/milvus/pkg/v3/metrics"
Expand Down Expand Up @@ -230,13 +230,17 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
return
}

// Extract actual timestamps from binlogs for segment positions
// Extract actual timestamps from binlogs for segment positions.
// L0 imports carry only deletes; non-L0 imports carry inserts.
importStats := storage.BuildStatsFromFieldBinlogs(info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs())
var minTs, maxTs uint64
isL0Import := importutilv2.IsL0Import(job.GetOptions())
if isL0Import {
minTs, maxTs = extractTimestampFromBinlogs(info.GetDeltalogs())
minTs = importStats.GetDeltaTimestampFrom()
maxTs = importStats.GetDeltaTimestampTo()
} else {
minTs, maxTs = extractTimestampFromBinlogs(info.GetBinlogs())
minTs = importStats.GetTimestampFrom()
maxTs = importStats.GetTimestampTo()
}

opBinlog := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs())
Expand Down Expand Up @@ -316,22 +320,3 @@ func (t *importTask) MarshalJSON() ([]byte, error) {
}
return json.Marshal(importTask)
}

// extractTimestampFromBinlogs extracts min and max timestamps from binlogs.
// The timestamps are stored in Binlog.TimestampFrom and Binlog.TimestampTo
// by BulkPackWriterV2.writeInserts() during import sync.
func extractTimestampFromBinlogs(binlogs []*datapb.FieldBinlog) (minTs, maxTs uint64) {
minTs = math.MaxUint64
maxTs = 0
for _, fieldBinlog := range binlogs {
for _, binlog := range fieldBinlog.GetBinlogs() {
if binlog.GetTimestampFrom() < minTs {
minTs = binlog.GetTimestampFrom()
}
if binlog.GetTimestampTo() > maxTs {
maxTs = binlog.GetTimestampTo()
}
}
}
return minTs, maxTs
}
Loading