Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,15 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
clear(sfScratch)
clear(budgetScratch)
s.metricBudgetsFromAgg.Get(budgetScratch)
var keptSizeSum int64
for m, stat := range bucket.CurStats {
if m == -1 {
for k, p := range stat.Partitions {
if size := p.TopSize + p.TailSize; size > 0 {
sf := sfScratch[k.ID]
sf[1] = float32(p.Traffic) / float32(size)
sfScratch[k.ID] = sf
keptSizeSum += int64(size)
}
}
continue
Expand All @@ -213,6 +215,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
sf[1] = float32(stat.Traffic) / float32(keptSize) // common sf
}
sfScratch[m] = sf
keptSizeSum += int64(keptSize)
}
for _, item := range bucket.MultiItems {
if item.Key.Metric == format.BuiltinMetricIDIngestionStatus && item.Key.Tags[2] == format.TagValueIDSrcIngestionStatusOKCached {
Expand Down Expand Up @@ -242,10 +245,6 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
remainingBudget := max(int64(config.MinSampleBudget), s.getShardSampleBudget(config)-budgetSum)
s.metricBudgetsFromAgg.MergeMaxOne(-1, uint32(remainingBudget))

// report budget used
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
[]int32{0, s.agent.componentTag},
float64(budgetSum+remainingBudget), 1)
// metric count
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingMetricCount,
[]int32{0, s.agent.componentTag},
Expand All @@ -265,6 +264,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
s.addSizeByTypeMetric(bucket.Time, format.TagValueIDSizeStringTop, samplingTag, sizeStringTop[i])
}

var comingSizeSum int64
for m, sizes := range bucket.CurSizes {
var size uint32
for _, sz := range sizes {
Expand All @@ -275,10 +275,27 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
Value: sfScratch[m][1],
OriginalSize: size,
})
if budget, ok := budgetScratch[m]; ok && size > budget {
size = budget // for metric readable
} else if int64(size) > remainingBudget {
size = uint32(remainingBudget)
}
comingSizeSum += int64(size)
}
sb.SetHaveOriginalSize(true)
bucket.Clear()

// report budget used
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
[]int32{0, s.agent.componentTag, format.TagValueIDSrcSamplingBudget},
float64(budgetSum+remainingBudget), 1)
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
[]int32{0, s.agent.componentTag, format.TagValueIDSrcSamplingBudgetUsed},
float64(keptSizeSum), 1)
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
[]int32{0, s.agent.componentTag, format.TagValueIDSrcSamplingBudgetComing},
float64(comingSizeSum), 1)

// Calculate size metrics for sample factors and ingestion status
sbSizeCalc := tlstatshouse.SourceBucket3{SampleFactors: sb.SampleFactors}
scratch = sbSizeCalc.WriteTL1(scratch[:0])
Expand Down
1 change: 1 addition & 0 deletions internal/balancer/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (s *tcpSender) reconnect() (net.Conn, error) {
_ = conn.Close()
return nil, err
}
log.Printf("balancer connected to upstream: %s", addr)
return conn, nil
}

Expand Down
1 change: 1 addition & 0 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ func (b *MetricsBucket) Clear() {
continue
}
stat.Traffic = 0
stat.KeepSize = 0
for k, p := range stat.Partitions {
if p.Traffic == 0 {
delete(stat.Partitions, k)
Expand Down
7 changes: 7 additions & 0 deletions internal/format/builtin_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,13 @@ var BuiltinMetricMetaSrcSamplingBudget = &MetricMetaValue{
Tags: []MetricMetaTag{{
Name: "component",
ValueComments: convertToValueComments(componentToValue),
}, {
Description: "type",
ValueComments: convertToValueComments(map[int32]string{
TagValueIDSrcSamplingBudget: "budget",
TagValueIDSrcSamplingBudgetUsed: "used",
TagValueIDSrcSamplingBudgetComing: "coming",
}),
}},
}

Expand Down
4 changes: 4 additions & 0 deletions internal/format/builtin_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ const (
TagValueIDSamplingDecisionKeep = -1
TagValueIDSamplingDecisionDiscard = -2

TagValueIDSrcSamplingBudget = 1
TagValueIDSrcSamplingBudgetUsed = 2
TagValueIDSrcSamplingBudgetComing = 3

TagValueIDDMESGParseError = 1
TagValueIDAPIPanicError = 2

Expand Down
Loading