Skip to content

Commit 653f110

Browse files
Rustamchukru.nazarov
andauthored
new logs and metrics (#2248)
* logs and metrics * logs and metrics * logs and metrics --------- Co-authored-by: ru.nazarov <ru.nazarov@vkteam.ru>
1 parent 84cb4b7 commit 653f110

5 files changed

Lines changed: 34 additions & 4 deletions

File tree

internal/agent/agent_shard_send.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,15 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
186186
clear(sfScratch)
187187
clear(budgetScratch)
188188
s.metricBudgetsFromAgg.Get(budgetScratch)
189+
var keptSizeSum int64
189190
for m, stat := range bucket.CurStats {
190191
if m == -1 {
191192
for k, p := range stat.Partitions {
192193
if size := p.TopSize + p.TailSize; size > 0 {
193194
sf := sfScratch[k.ID]
194195
sf[1] = float32(p.Traffic) / float32(size)
195196
sfScratch[k.ID] = sf
197+
keptSizeSum += int64(size)
196198
}
197199
}
198200
continue
@@ -213,6 +215,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
213215
sf[1] = float32(stat.Traffic) / float32(keptSize) // common sf
214216
}
215217
sfScratch[m] = sf
218+
keptSizeSum += int64(keptSize)
216219
}
217220
for _, item := range bucket.MultiItems {
218221
if item.Key.Metric == format.BuiltinMetricIDIngestionStatus && item.Key.Tags[2] == format.TagValueIDSrcIngestionStatusOKCached {
@@ -242,10 +245,6 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
242245
remainingBudget := max(int64(config.MinSampleBudget), s.getShardSampleBudget(config)-budgetSum)
243246
s.metricBudgetsFromAgg.MergeMaxOne(-1, uint32(remainingBudget))
244247

245-
// report budget used
246-
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
247-
[]int32{0, s.agent.componentTag},
248-
float64(budgetSum+remainingBudget), 1)
249248
// metric count
250249
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingMetricCount,
251250
[]int32{0, s.agent.componentTag},
@@ -265,6 +264,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
265264
s.addSizeByTypeMetric(bucket.Time, format.TagValueIDSizeStringTop, samplingTag, sizeStringTop[i])
266265
}
267266

267+
var comingSizeSum int64
268268
for m, sizes := range bucket.CurSizes {
269269
var size uint32
270270
for _, sz := range sizes {
@@ -275,10 +275,27 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
275275
Value: sfScratch[m][1],
276276
OriginalSize: size,
277277
})
278+
if budget, ok := budgetScratch[m]; ok && size > budget {
279+
size = budget // for metric readable
280+
} else if int64(size) > remainingBudget {
281+
size = uint32(remainingBudget)
282+
}
283+
comingSizeSum += int64(size)
278284
}
279285
sb.SetHaveOriginalSize(true)
280286
bucket.Clear()
281287

288+
// report budget used
289+
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
290+
[]int32{0, s.agent.componentTag, format.TagValueIDSrcSamplingBudget},
291+
float64(budgetSum+remainingBudget), 1)
292+
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
293+
[]int32{0, s.agent.componentTag, format.TagValueIDSrcSamplingBudgetUsed},
294+
float64(keptSizeSum), 1)
295+
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
296+
[]int32{0, s.agent.componentTag, format.TagValueIDSrcSamplingBudgetComing},
297+
float64(comingSizeSum), 1)
298+
282299
// Calculate size metrics for sample factors and ingestion status
283300
sbSizeCalc := tlstatshouse.SourceBucket3{SampleFactors: sb.SampleFactors}
284301
scratch = sbSizeCalc.WriteTL1(scratch[:0])

internal/balancer/egress.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ func (s *tcpSender) reconnect() (net.Conn, error) {
373373
_ = conn.Close()
374374
return nil, err
375375
}
376+
log.Printf("balancer connected to upstream: %s", addr)
376377
return conn, nil
377378
}
378379

internal/data_model/bucket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ func (b *MetricsBucket) Clear() {
527527
continue
528528
}
529529
stat.Traffic = 0
530+
stat.KeepSize = 0
530531
for k, p := range stat.Partitions {
531532
if p.Traffic == 0 {
532533
delete(stat.Partitions, k)

internal/format/builtin_metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,6 +1748,13 @@ var BuiltinMetricMetaSrcSamplingBudget = &MetricMetaValue{
17481748
Tags: []MetricMetaTag{{
17491749
Name: "component",
17501750
ValueComments: convertToValueComments(componentToValue),
1751+
}, {
1752+
Description: "type",
1753+
ValueComments: convertToValueComments(map[int32]string{
1754+
TagValueIDSrcSamplingBudget: "budget",
1755+
TagValueIDSrcSamplingBudgetUsed: "used",
1756+
TagValueIDSrcSamplingBudgetComing: "coming",
1757+
}),
17511758
}},
17521759
}
17531760

internal/format/builtin_tags.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ const (
354354
TagValueIDSamplingDecisionKeep = -1
355355
TagValueIDSamplingDecisionDiscard = -2
356356

357+
TagValueIDSrcSamplingBudget = 1
358+
TagValueIDSrcSamplingBudgetUsed = 2
359+
TagValueIDSrcSamplingBudgetComing = 3
360+
357361
TagValueIDDMESGParseError = 1
358362
TagValueIDAPIPanicError = 2
359363

0 commit comments

Comments
 (0)