Skip to content

Commit 6569698

Browse files
Rustamchukru.nazarov
andauthored
fix onfly sampling sf (#2251)
Co-authored-by: ru.nazarov <ru.nazarov@vkteam.ru>
1 parent 5897d05 commit 6569698

2 files changed

Lines changed: 62 additions & 61 deletions

File tree

internal/agent/agent_shard_send.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,27 +192,31 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
192192
for k, p := range stat.Partitions {
193193
if size := p.TopSize + p.TailSize; size > 0 {
194194
sf := sfScratch[k.ID]
195-
sf[1] = float32(p.Traffic) / float32(size)
195+
if p.KeptTraffic > 0 {
196+
sf[1] = float32(p.Traffic) / float32(p.KeptTraffic)
197+
}
196198
sfScratch[k.ID] = sf
197199
keptSizeSum += int64(size)
198200
}
199201
}
200202
continue
201203
}
204+
sizeTraffic := uint32(0)
202205
keptTraffic := uint32(0)
203206
keptSize := uint32(0)
204207
for _, p := range stat.Partitions {
205208
if p.TopSize != 0 || p.TailSize != 0 {
206-
keptTraffic += p.Traffic
209+
sizeTraffic += p.Traffic
210+
keptTraffic += p.KeptTraffic
207211
keptSize += p.TopSize + p.TailSize
208212
}
209213
}
210214
sf := sfScratch[m]
211-
if keptTraffic > 0 {
212-
sf[0] = float32(stat.Traffic) / float32(keptTraffic) // global sf
215+
if sizeTraffic > 0 {
216+
sf[0] = float32(stat.Traffic) / float32(sizeTraffic) // global sf
213217
}
214-
if keptSize > 0 {
215-
sf[1] = float32(stat.Traffic) / float32(keptSize) // common sf
218+
if keptTraffic > 0 {
219+
sf[1] = float32(stat.Traffic) / float32(keptTraffic) // common sf
216220
}
217221
sfScratch[m] = sf
218222
keptSizeSum += int64(keptSize)

internal/data_model/bucket.go

Lines changed: 52 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type (
7070
SF float64 // set when Marshalling/Sampling
7171
Count float64
7272
Size uint32
73+
DupCnt uint32 // duplicates count
7374
MetricMeta *format.MetricMetaValue
7475
}
7576

@@ -98,8 +99,9 @@ type (
9899
}
99100

100101
BucketPartition struct {
101-
Traffic uint32
102-
Budget uint32
102+
Traffic uint32
103+
Budget uint32
104+
KeptTraffic uint32
103105

104106
TopSize uint32
105107
TopSfLog2 int
@@ -357,14 +359,19 @@ func (b *MetricsBucket) SampleOrCreateMultiItem(rng *rand.Rand, key *Key, metric
357359

358360
if item := part.Top[keyString]; item != nil {
359361
item.Count += count
360-
item.SF = (float64(part.Traffic)) / float64(part.TopSize+part.TailSize)
362+
item.DupCnt++
363+
part.KeptTraffic += item.Size
364+
item.SF = part.sampleFactor()
361365
return item, created
362366
}
363367
if item := part.Tail[keyString]; item != nil {
364368
item.Count += count
365-
item.SF = (float64(part.Traffic)) / float64(part.TopSize+part.TailSize)
366369
if b.sampleTop(rng, part, part.Budget/2, keyString, item, item.Count) { // try move to top
367-
b.removeTail(part, keyString)
370+
b.removeTail(part, keyString, false)
371+
} else {
372+
item.DupCnt++
373+
part.KeptTraffic += item.Size
374+
item.SF = part.sampleFactor()
368375
}
369376
return item, created
370377
}
@@ -394,6 +401,13 @@ func (b *MetricsBucket) SampleOrCreateMultiItem(rng *rand.Rand, key *Key, metric
394401
return nil, created
395402
}
396403

404+
func (p *BucketPartition) sampleFactor() float64 {
405+
if p.KeptTraffic == 0 || p.Traffic <= p.KeptTraffic {
406+
return 1
407+
}
408+
return float64(p.Traffic) / float64(p.KeptTraffic)
409+
}
410+
397411
func (s *BucketStat) recalc(rng *rand.Rand, b *MetricsBucket, totalBudget, partBudget uint32) {
398412
for _, p := range s.Partitions {
399413
if p.TopSize+p.TailSize < partBudget*2 {
@@ -434,12 +448,13 @@ func samplingDecisionKey(key *Key, metricInfo *format.MetricMetaValue, metricID,
434448

435449
func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget uint32, keyString string, item *MultiItem) bool {
436450
if part.Traffic > budget && rng.Float64()*float64(part.Traffic) >= float64(budget) {
451+
b.removeTail(part, keyString, true)
437452
return false
438453
}
439454
part.TailSize += item.Size
440-
if part.Traffic > part.TopSize+part.TailSize {
441-
item.SF = (float64(part.Traffic)) / float64(part.TopSize+part.TailSize)
442-
}
455+
item.DupCnt++
456+
part.KeptTraffic += item.Size
457+
item.SF = part.sampleFactor()
443458
part.Tail[keyString] = item
444459
b.MultiItems[keyString] = item
445460
for part.TailSize > budget && len(part.Tail) != 0 {
@@ -453,23 +468,28 @@ func (b *MetricsBucket) removeRandomTail(part *BucketPartition) {
453468
return
454469
}
455470
for k := range part.Tail { // quasirandom remove, quite ok for O(1)
456-
b.removeTail(part, k)
471+
b.removeTail(part, k, true)
457472
break
458473
}
459474
}
460475

461-
func (b *MetricsBucket) removeTail(part *BucketPartition, key string) {
462-
item, ok := part.Tail[key]
463-
if !ok {
464-
return
476+
func (b *MetricsBucket) removeTail(part *BucketPartition, key string, fullDelete bool) {
477+
if item, ok := part.Tail[key]; ok {
478+
if item.Size >= part.TailSize {
479+
part.TailSize = 0
480+
} else {
481+
part.TailSize -= item.Size
482+
}
483+
delete(part.Tail, key)
465484
}
466-
if item.Size >= part.TailSize {
467-
part.TailSize = 0
468-
} else {
469-
part.TailSize -= item.Size
485+
if item, ok := b.MultiItems[key]; ok && fullDelete {
486+
if traffic := item.Size * item.DupCnt; traffic >= part.KeptTraffic {
487+
part.KeptTraffic = 0
488+
} else {
489+
part.KeptTraffic -= traffic
490+
}
491+
delete(b.MultiItems, key)
470492
}
471-
delete(part.Tail, key)
472-
delete(b.MultiItems, key)
473493
}
474494

475495
func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget uint32, key string, item *MultiItem, count float64) bool {
@@ -480,9 +500,9 @@ func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget
480500
}
481501
}
482502
part.TopSize += item.Size
483-
if part.Traffic > part.TopSize+part.TailSize {
484-
item.SF = (float64(part.Traffic)) / float64(part.TopSize+part.TailSize)
485-
}
503+
item.DupCnt++
504+
part.KeptTraffic += item.Size
505+
item.SF = part.sampleFactor()
486506
part.Top[key] = item
487507
b.MultiItems[key] = item
488508
for part.TopSize > budget && len(part.Top) != 0 {
@@ -508,8 +528,13 @@ func (p *BucketPartition) resampleTop(rng *rand.Rand, b *MetricsBucket, tailBudg
508528
} else {
509529
p.TopSize -= v.Size
510530
}
531+
v.DupCnt--
532+
if v.Size >= p.KeptTraffic {
533+
p.KeptTraffic = 0
534+
} else {
535+
p.KeptTraffic -= v.Size
536+
}
511537
delete(p.Top, k)
512-
delete(b.MultiItems, k)
513538
b.sampleTail(rng, p, tailBudget, k, v) // move to tail
514539
if i++; i >= was/2 {
515540
return // for remain low items
@@ -519,38 +544,10 @@ func (p *BucketPartition) resampleTop(rng *rand.Rand, b *MetricsBucket, tailBudg
519544
}
520545

521546
func (b *MetricsBucket) Clear() {
522-
for mID, stat := range b.CurStats {
523-
if stat.Traffic == 0 {
524-
// remove disappeared from last sec, mem economy
525-
clear(stat.Partitions) // help GC
526-
delete(b.CurStats, mID)
527-
continue
528-
}
529-
stat.Traffic = 0
530-
stat.KeepSize = 0
531-
for k, p := range stat.Partitions {
532-
if p.Traffic == 0 {
533-
delete(stat.Partitions, k)
534-
continue
535-
}
536-
p.Traffic = 0
537-
p.Budget = 0
538-
p.TopSize = 0
539-
p.TopSfLog2 = 0
540-
p.TailSize = 0
541-
clear(p.Top)
542-
clear(p.Tail)
543-
}
544-
}
545-
for _, sizes := range b.CurSizes {
546-
clear(sizes)
547-
}
548-
b.MultiItemMap.Clear()
549-
}
550-
551-
func (b *MultiItemMap) Clear() {
552-
clear(b.MultiItems)
553-
b.keysBuffer = b.keysBuffer[:0] // we cleared map, dropped strings, now we can reuse bytes
547+
clear(b.CurStats)
548+
clear(b.CurSizes)
549+
b.keysBuffer = []byte{}
550+
b.MultiItems = map[string]*MultiItem{}
554551
}
555552

556553
func (b *MultiItemMap) GetOrCreateMultiItem(key *Key, metricInfo *format.MetricMetaValue, keyBytes []byte) (item *MultiItem, created bool) {

0 commit comments

Comments
 (0)