Skip to content

Commit db3489a

Browse files
Rustamchukru.nazarov
andauthored
onfly sampling optimisation (#2253)
Co-authored-by: ru.nazarov <ru.nazarov@vkteam.ru>
1 parent 4ffe745 commit db3489a

2 files changed

Lines changed: 103 additions & 57 deletions

File tree

internal/agent/agent_shard_send.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
189189
var keptSizeSum int64
190190
for m, stat := range bucket.CurStats {
191191
if m == -1 {
192-
for k, p := range stat.Partitions {
192+
for k, p := range stat.Partitions { // only partitions map
193193
p.SetSampleFactor()
194194
if size := p.TopSize + p.TailSize; size > 0 {
195195
sf := sfScratch[k.ID]
@@ -205,14 +205,21 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
205205
sizeTraffic := uint32(0)
206206
keptTraffic := uint32(0)
207207
keptSize := uint32(0)
208-
for _, p := range stat.Partitions {
208+
f := func(p *data_model.BucketPartition) {
209209
p.SetSampleFactor()
210210
if p.TopSize != 0 || p.TailSize != 0 {
211211
sizeTraffic += p.Traffic
212212
keptTraffic += p.KeptTraffic
213213
keptSize += p.TopSize + p.TailSize
214214
}
215215
}
216+
if stat.Partition != nil {
217+
f(stat.Partition)
218+
} else {
219+
for _, p := range stat.Partitions {
220+
f(p)
221+
}
222+
}
216223
sf := sfScratch[m]
217224
if sizeTraffic > 0 {
218225
sf[0] = float32(stat.Traffic) / float32(sizeTraffic) // global sf

internal/data_model/bucket.go

Lines changed: 94 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type (
8686
BucketStat struct {
8787
Traffic uint32
8888
KeepSize uint32
89+
Partition *BucketPartition // for fixed memory metric optimisation
8990
Partitions map[PartitionKey]*BucketPartition // mostly len=1. len>1 for fairKey and others
9091
}
9192

@@ -327,76 +328,87 @@ func (b *MetricsBucket) SampleOrCreateMultiItem(rng *rand.Rand, key *Key, metric
327328
}
328329
keyString := unsafe.String(unsafe.SliceData(keyBytes), len(keyBytes))
329330

330-
sizes := b.CurSizes[metricID]
331-
if sizes == nil {
332-
sizes = map[string]*BucketSizeItem{}
333-
b.CurSizes[metricID] = sizes
334-
}
335-
created = true
336-
if size, ok := sizes[keyString]; ok {
337-
keyString = size.key // use old keystring
338-
b.keysBuffer = b.keysBuffer[:wasLen]
339-
created = false
340-
}
341-
342331
root := b.CurStats[budgetID]
343332
if root == nil {
344333
root = &BucketStat{Partitions: map[PartitionKey]*BucketPartition{}}
345334
b.CurStats[budgetID] = root
346335
}
347336
decisionKey := samplingDecisionKey(key, metricInfo, metricID, budgetID)
348-
part, ok := root.Partitions[decisionKey]
349-
if !ok {
350-
sf := float64(1)
351-
part = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
352-
root.Partitions[decisionKey] = part
337+
var part *BucketPartition
338+
if decisionKey.ID == 0 { // no-map optimisation
339+
if root.Partition == nil {
340+
sf := float64(1)
341+
root.Partition = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
342+
}
343+
part = root.Partition
344+
} else {
345+
part = root.Partitions[decisionKey]
346+
if part == nil {
347+
sf := float64(1)
348+
part = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
349+
root.Partitions[decisionKey] = part
350+
}
353351
}
354352

355-
size := uint32(key.TLSizeEstimate(key.Timestamp))
356-
if created {
357-
sizes[keyString] = &BucketSizeItem{key: keyString, Size: size}
358-
}
359-
part.Traffic += size
360-
root.Traffic += size
353+
itemSize := uint32(key.TLSizeEstimate(key.Timestamp))
354+
part.Traffic += itemSize
355+
root.Traffic += itemSize
361356

362357
if item = part.Top[keyString]; item != nil {
363358
item.Count += count
364359
item.DupCnt++
365360
part.KeptTraffic += item.Size
366-
return item, created
361+
b.keysBuffer = b.keysBuffer[:wasLen]
362+
return item, false
367363
}
368364
if item = part.Tail[keyString]; item != nil {
369365
item.Count += count
370-
if b.sampleTop(rng, part, part.Budget/2, keyString, item, item.Count) { // try move to top
366+
if v := b.sampleTop(rng, part, part.Budget/2, &item.Key, item.MetricMeta, keyString, item, item.Count, item.Size); v != nil { // try move to top
371367
b.removeTail(part, keyString, false)
372-
} else {
368+
} else if item.Size != 0 {
373369
item.DupCnt++
374370
part.KeptTraffic += item.Size
371+
} else { // we could full lose it after sample
372+
return nil, false
375373
}
376-
return item, created
374+
b.keysBuffer = b.keysBuffer[:wasLen]
375+
return item, false
377376
}
378-
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: metricInfo}
379377

380-
part.Budget = budget
381-
if len(root.Partitions) > 0 {
382-
part.Budget = uint32(math.Round(float64(budget) / float64(len(root.Partitions))))
378+
sizes := b.CurSizes[metricID]
379+
if sizes == nil {
380+
sizes = map[string]*BucketSizeItem{}
381+
b.CurSizes[metricID] = sizes
383382
}
384-
halfBudget := uint32(math.Round(float64(part.Budget) / 2))
385-
if root.KeepSize < budget {
386-
halfBudget = budget // get full root budget, until fit budget
383+
created = true
384+
if size, ok := sizes[keyString]; ok {
385+
keyString = size.key // use old keystring
386+
b.keysBuffer = b.keysBuffer[:wasLen]
387+
created = false
388+
} else {
389+
sizes[keyString] = &BucketSizeItem{key: keyString, Size: itemSize}
390+
}
391+
392+
part.Budget = budget
393+
halfBudget := budget // get full root budget, until fit budget
394+
if root.KeepSize >= budget {
395+
if len(root.Partitions) > 0 {
396+
part.Budget = uint32(math.Round(float64(budget) / float64(len(root.Partitions))))
397+
}
398+
halfBudget = uint32(math.Round(float64(part.Budget) / 2))
387399
}
388400

389401
root.KeepSize -= part.TopSize + part.TailSize
390-
if b.sampleTop(rng, part, halfBudget, keyString, item, count) {
402+
if item = b.sampleTop(rng, part, halfBudget, key, metricInfo, keyString, nil, count, itemSize); item != nil { // not create item if sample optimisation
391403
root.KeepSize += part.TopSize + part.TailSize
392404
root.recalc(rng, b, budget, part.Budget)
393-
return
405+
return item, created
394406
}
395407
root.KeepSize += part.TopSize
396-
if b.sampleTail(rng, part, halfBudget, keyString, item) {
408+
if item = b.sampleTail(rng, part, halfBudget, key, metricInfo, keyString, nil, count, itemSize); item != nil { // not create item if sample optimisation
397409
root.KeepSize += part.TailSize
398410
root.recalc(rng, b, budget, part.Budget)
399-
return
411+
return item, created
400412
}
401413
root.KeepSize += part.TailSize
402414
return nil, created
@@ -411,12 +423,12 @@ func (p *BucketPartition) SetSampleFactor() {
411423
}
412424

413425
func (s *BucketStat) recalc(rng *rand.Rand, b *MetricsBucket, totalBudget, partBudget uint32) {
414-
for _, p := range s.Partitions {
426+
if totalBudget >= s.KeepSize {
427+
return // do not delete everything
428+
}
429+
f := func(p *BucketPartition) {
415430
if p.TopSize+p.TailSize < partBudget*2 {
416-
continue // no need recalc
417-
}
418-
if totalBudget >= s.KeepSize {
419-
break // do not delete everything
431+
return // no need recalc
420432
}
421433
p.Budget = partBudget
422434
halfBudget := uint32(math.Round(float64(partBudget) / 2))
@@ -432,13 +444,27 @@ func (s *BucketStat) recalc(rng *rand.Rand, b *MetricsBucket, totalBudget, partB
432444
s.KeepSize += p.TopSize
433445
s.KeepSize += p.TailSize
434446
}
447+
if s.Partition != nil {
448+
f(s.Partition)
449+
return
450+
}
451+
for _, p := range s.Partitions {
452+
f(p)
453+
if totalBudget >= s.KeepSize {
454+
break // do not delete everything
455+
}
456+
}
435457
}
436458

437459
func samplingDecisionKey(key *Key, metricInfo *format.MetricMetaValue, metricID, budgetID int32) PartitionKey {
438-
var pk = PartitionKey{ID: metricID}
439-
if budgetID == -1 || metricInfo == nil || len(metricInfo.FairKeyIndex) == 0 {
440-
return pk
460+
if budgetID == -1 {
461+
return PartitionKey{ID: metricID}
441462
} // no fair logic for common metrics
463+
if metricInfo == nil || len(metricInfo.FairKeyIndex) == 0 {
464+
return PartitionKey{}
465+
}
466+
467+
var pk = PartitionKey{ID: metricID}
442468
n := min(len(metricInfo.FairKeyIndex), maxFairKeyLen)
443469
for i := 0; i < n; i++ {
444470
if x := metricInfo.FairKeyIndex[i]; 0 <= x && x < len(key.Tags) {
@@ -448,10 +474,13 @@ func samplingDecisionKey(key *Key, metricInfo *format.MetricMetaValue, metricID,
448474
return pk
449475
}
450476

451-
func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget uint32, keyString string, item *MultiItem) bool {
477+
func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget uint32, key *Key, meta *format.MetricMetaValue, keyString string, item *MultiItem, count float64, size uint32) *MultiItem {
452478
if part.Traffic > budget && rng.Float64()*float64(part.Traffic) >= float64(budget) {
453479
b.removeTail(part, keyString, true)
454-
return false
480+
return nil
481+
}
482+
if item == nil {
483+
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: meta}
455484
}
456485
part.TailSize += item.Size
457486
item.DupCnt++
@@ -461,7 +490,10 @@ func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget
461490
for part.TailSize > budget && len(part.Tail) != 0 {
462491
b.removeRandomTail(part)
463492
}
464-
return part.Tail[keyString] == item
493+
if item.Size == 0 { // removeRandomTail could remove item
494+
return nil
495+
}
496+
return item
465497
}
466498

467499
func (b *MetricsBucket) removeRandomTail(part *BucketPartition) {
@@ -489,26 +521,33 @@ func (b *MetricsBucket) removeTail(part *BucketPartition, key string, fullDelete
489521
} else {
490522
part.KeptTraffic -= traffic
491523
}
524+
item.Size = 0 // optimisation to avoid map lookup
492525
delete(b.MultiItems, key)
493526
}
494527
}
495528

496-
func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget uint32, key string, item *MultiItem, count float64) bool {
529+
func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget uint32, key *Key, meta *format.MetricMetaValue, keyString string, item *MultiItem, count float64, size uint32) *MultiItem {
497530
sf := 1 << part.TopSfLog2
498531
if part.TopSfLog2 != 0 && count < float64(sf) {
499532
if rng.Float64()*float64(sf) >= count {
500-
return false
533+
return nil
501534
}
502535
}
536+
if item == nil {
537+
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: meta}
538+
}
503539
part.TopSize += item.Size
504540
item.DupCnt++
505541
part.KeptTraffic += item.Size
506-
part.Top[key] = item
507-
b.MultiItems[key] = item
542+
part.Top[keyString] = item
543+
b.MultiItems[keyString] = item
508544
for part.TopSize > budget && len(part.Top) != 0 {
509545
part.resampleTop(rng, b, budget)
510546
}
511-
return part.Top[key] == item || part.Tail[key] == item // resampleTop could drop item to tail
547+
if item.Size == 0 { // resampleTop could drop item to tail
548+
return nil
549+
}
550+
return item
512551
}
513552

514553
func (p *BucketPartition) resampleTop(rng *rand.Rand, b *MetricsBucket, tailBudget uint32) {
@@ -535,7 +574,7 @@ func (p *BucketPartition) resampleTop(rng *rand.Rand, b *MetricsBucket, tailBudg
535574
p.KeptTraffic -= v.Size
536575
}
537576
delete(p.Top, k)
538-
b.sampleTail(rng, p, tailBudget, k, v) // move to tail
577+
b.sampleTail(rng, p, tailBudget, &v.Key, v.MetricMeta, k, v, v.Count, v.Size) // move to tail
539578
if i++; i >= was/2 {
540579
return // for remain low items
541580
}

0 commit comments

Comments
 (0)