Skip to content

Commit 4ffe745

Browse files
Rustamchukru.nazarov
andauthored
fix onfly sampling sf (#2252)
Co-authored-by: ru.nazarov <ru.nazarov@vkteam.ru>
1 parent 6569698 commit 4ffe745

2 files changed

Lines changed: 22 additions & 17 deletions

File tree

internal/agent/agent_shard_send.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
190190
for m, stat := range bucket.CurStats {
191191
if m == -1 {
192192
for k, p := range stat.Partitions {
193+
p.SetSampleFactor()
193194
if size := p.TopSize + p.TailSize; size > 0 {
194195
sf := sfScratch[k.ID]
195196
if p.KeptTraffic > 0 {
@@ -205,6 +206,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
205206
keptTraffic := uint32(0)
206207
keptSize := uint32(0)
207208
for _, p := range stat.Partitions {
209+
p.SetSampleFactor()
208210
if p.TopSize != 0 || p.TailSize != 0 {
209211
sizeTraffic += p.Traffic
210212
keptTraffic += p.KeptTraffic
@@ -237,8 +239,11 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
237239
continue
238240
}
239241
_ = item.FinishStringTop(rnd, config.StringTopCountSend) // all excess items are baked into Tail
242+
if item.GlobalSF != nil {
243+
item.SF = *item.GlobalSF // within-partition
244+
}
240245
if globalSF := sfScratch[item.Key.AccountMetric()]; globalSF[0] > 0 {
241-
item.SF *= float64(globalSF[0])
246+
item.SF *= float64(globalSF[0]) // cross-partition
242247
}
243248
keepF(item, bucket.Time, 0)
244249
}

internal/data_model/bucket.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type (
6767
Top map[TagUnion]*MultiValue
6868
Tail MultiValue // elements not in top are collected here
6969
sampleFactorLog2 int
70-
SF float64 // set when Marshalling/Sampling
70+
SF float64 // set when Marshalling/Sampling
71+
GlobalSF *float64 // BucketPartition SF
7172
Count float64
7273
Size uint32
7374
DupCnt uint32 // duplicates count
@@ -102,6 +103,7 @@ type (
102103
Traffic uint32
103104
Budget uint32
104105
KeptTraffic uint32
106+
SF *float64 // ptr to every MultiItem.GlobalSF
105107

106108
TopSize uint32
107109
TopSfLog2 int
@@ -345,36 +347,35 @@ func (b *MetricsBucket) SampleOrCreateMultiItem(rng *rand.Rand, key *Key, metric
345347
decisionKey := samplingDecisionKey(key, metricInfo, metricID, budgetID)
346348
part, ok := root.Partitions[decisionKey]
347349
if !ok {
348-
part = &BucketPartition{Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
350+
sf := float64(1)
351+
part = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
349352
root.Partitions[decisionKey] = part
350353
}
351354

352-
item = &MultiItem{Key: *key, SF: 1, Count: count, MetricMeta: metricInfo}
353-
item.Size = item.TLSize()
355+
size := uint32(key.TLSizeEstimate(key.Timestamp))
354356
if created {
355-
sizes[keyString] = &BucketSizeItem{key: keyString, Size: item.Size}
357+
sizes[keyString] = &BucketSizeItem{key: keyString, Size: size}
356358
}
357-
part.Traffic += item.Size
358-
root.Traffic += item.Size
359+
part.Traffic += size
360+
root.Traffic += size
359361

360-
if item := part.Top[keyString]; item != nil {
362+
if item = part.Top[keyString]; item != nil {
361363
item.Count += count
362364
item.DupCnt++
363365
part.KeptTraffic += item.Size
364-
item.SF = part.sampleFactor()
365366
return item, created
366367
}
367-
if item := part.Tail[keyString]; item != nil {
368+
if item = part.Tail[keyString]; item != nil {
368369
item.Count += count
369370
if b.sampleTop(rng, part, part.Budget/2, keyString, item, item.Count) { // try move to top
370371
b.removeTail(part, keyString, false)
371372
} else {
372373
item.DupCnt++
373374
part.KeptTraffic += item.Size
374-
item.SF = part.sampleFactor()
375375
}
376376
return item, created
377377
}
378+
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: metricInfo}
378379

379380
part.Budget = budget
380381
if len(root.Partitions) > 0 {
@@ -401,11 +402,12 @@ func (b *MetricsBucket) SampleOrCreateMultiItem(rng *rand.Rand, key *Key, metric
401402
return nil, created
402403
}
403404

404-
func (p *BucketPartition) sampleFactor() float64 {
405+
func (p *BucketPartition) SetSampleFactor() {
405406
if p.KeptTraffic == 0 || p.Traffic <= p.KeptTraffic {
406-
return 1
407+
*p.SF = 1
408+
return
407409
}
408-
return float64(p.Traffic) / float64(p.KeptTraffic)
410+
*p.SF = float64(p.Traffic) / float64(p.KeptTraffic)
409411
}
410412

411413
func (s *BucketStat) recalc(rng *rand.Rand, b *MetricsBucket, totalBudget, partBudget uint32) {
@@ -454,7 +456,6 @@ func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget
454456
part.TailSize += item.Size
455457
item.DupCnt++
456458
part.KeptTraffic += item.Size
457-
item.SF = part.sampleFactor()
458459
part.Tail[keyString] = item
459460
b.MultiItems[keyString] = item
460461
for part.TailSize > budget && len(part.Tail) != 0 {
@@ -502,7 +503,6 @@ func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget
502503
part.TopSize += item.Size
503504
item.DupCnt++
504505
part.KeptTraffic += item.Size
505-
item.SF = part.sampleFactor()
506506
part.Top[key] = item
507507
b.MultiItems[key] = item
508508
for part.TopSize > budget && len(part.Top) != 0 {

0 commit comments

Comments
 (0)