Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
var keptSizeSum int64
for m, stat := range bucket.CurStats {
if m == -1 {
for k, p := range stat.Partitions {
for k, p := range stat.Partitions { // only partitions map
p.SetSampleFactor()
if size := p.TopSize + p.TailSize; size > 0 {
sf := sfScratch[k.ID]
Expand All @@ -205,14 +205,21 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
sizeTraffic := uint32(0)
keptTraffic := uint32(0)
keptSize := uint32(0)
for _, p := range stat.Partitions {
f := func(p *data_model.BucketPartition) {
p.SetSampleFactor()
if p.TopSize != 0 || p.TailSize != 0 {
sizeTraffic += p.Traffic
keptTraffic += p.KeptTraffic
keptSize += p.TopSize + p.TailSize
}
}
if stat.Partition != nil {
f(stat.Partition)
} else {
for _, p := range stat.Partitions {
f(p)
}
}
sf := sfScratch[m]
if sizeTraffic > 0 {
sf[0] = float32(stat.Traffic) / float32(sizeTraffic) // global sf
Expand Down
149 changes: 94 additions & 55 deletions internal/data_model/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type (
BucketStat struct {
Traffic uint32
KeepSize uint32
Partition *BucketPartition // for fixed memory metric optimisation
Partitions map[PartitionKey]*BucketPartition // mostly len=1. len>1 for fairKey and others
}

Expand Down Expand Up @@ -327,76 +328,87 @@ func (b *MetricsBucket) SampleOrCreateMultiItem(rng *rand.Rand, key *Key, metric
}
keyString := unsafe.String(unsafe.SliceData(keyBytes), len(keyBytes))

sizes := b.CurSizes[metricID]
if sizes == nil {
sizes = map[string]*BucketSizeItem{}
b.CurSizes[metricID] = sizes
}
created = true
if size, ok := sizes[keyString]; ok {
keyString = size.key // use old keystring
b.keysBuffer = b.keysBuffer[:wasLen]
created = false
}

root := b.CurStats[budgetID]
if root == nil {
root = &BucketStat{Partitions: map[PartitionKey]*BucketPartition{}}
b.CurStats[budgetID] = root
}
decisionKey := samplingDecisionKey(key, metricInfo, metricID, budgetID)
part, ok := root.Partitions[decisionKey]
if !ok {
sf := float64(1)
part = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
root.Partitions[decisionKey] = part
var part *BucketPartition
if decisionKey.ID == 0 { // no-map optimisation
if root.Partition == nil {
sf := float64(1)
root.Partition = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
}
part = root.Partition
} else {
part = root.Partitions[decisionKey]
if part == nil {
sf := float64(1)
part = &BucketPartition{SF: &sf, Tail: map[string]*MultiItem{}, Top: map[string]*MultiItem{}}
root.Partitions[decisionKey] = part
}
}

size := uint32(key.TLSizeEstimate(key.Timestamp))
if created {
sizes[keyString] = &BucketSizeItem{key: keyString, Size: size}
}
part.Traffic += size
root.Traffic += size
itemSize := uint32(key.TLSizeEstimate(key.Timestamp))
part.Traffic += itemSize
root.Traffic += itemSize

if item = part.Top[keyString]; item != nil {
item.Count += count
item.DupCnt++
part.KeptTraffic += item.Size
return item, created
b.keysBuffer = b.keysBuffer[:wasLen]
return item, false
}
if item = part.Tail[keyString]; item != nil {
item.Count += count
if b.sampleTop(rng, part, part.Budget/2, keyString, item, item.Count) { // try move to top
if v := b.sampleTop(rng, part, part.Budget/2, &item.Key, item.MetricMeta, keyString, item, item.Count, item.Size); v != nil { // try move to top
b.removeTail(part, keyString, false)
} else {
} else if item.Size != 0 {
item.DupCnt++
part.KeptTraffic += item.Size
} else { // we could full lose it after sample
return nil, false
}
return item, created
b.keysBuffer = b.keysBuffer[:wasLen]
return item, false
}
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: metricInfo}

part.Budget = budget
if len(root.Partitions) > 0 {
part.Budget = uint32(math.Round(float64(budget) / float64(len(root.Partitions))))
sizes := b.CurSizes[metricID]
if sizes == nil {
sizes = map[string]*BucketSizeItem{}
b.CurSizes[metricID] = sizes
}
halfBudget := uint32(math.Round(float64(part.Budget) / 2))
if root.KeepSize < budget {
halfBudget = budget // get full root budget, until fit budget
created = true
if size, ok := sizes[keyString]; ok {
keyString = size.key // use old keystring
b.keysBuffer = b.keysBuffer[:wasLen]
created = false
} else {
sizes[keyString] = &BucketSizeItem{key: keyString, Size: itemSize}
}

part.Budget = budget
halfBudget := budget // get full root budget, until fit budget
if root.KeepSize >= budget {
if len(root.Partitions) > 0 {
part.Budget = uint32(math.Round(float64(budget) / float64(len(root.Partitions))))
}
halfBudget = uint32(math.Round(float64(part.Budget) / 2))
}

root.KeepSize -= part.TopSize + part.TailSize
if b.sampleTop(rng, part, halfBudget, keyString, item, count) {
if item = b.sampleTop(rng, part, halfBudget, key, metricInfo, keyString, nil, count, itemSize); item != nil { // not create item if sample optimisation
root.KeepSize += part.TopSize + part.TailSize
root.recalc(rng, b, budget, part.Budget)
return
return item, created
}
root.KeepSize += part.TopSize
if b.sampleTail(rng, part, halfBudget, keyString, item) {
if item = b.sampleTail(rng, part, halfBudget, key, metricInfo, keyString, nil, count, itemSize); item != nil { // not create item if sample optimisation
root.KeepSize += part.TailSize
root.recalc(rng, b, budget, part.Budget)
return
return item, created
}
root.KeepSize += part.TailSize
return nil, created
Expand All @@ -411,12 +423,12 @@ func (p *BucketPartition) SetSampleFactor() {
}

func (s *BucketStat) recalc(rng *rand.Rand, b *MetricsBucket, totalBudget, partBudget uint32) {
for _, p := range s.Partitions {
if totalBudget >= s.KeepSize {
return // do not delete everything
}
f := func(p *BucketPartition) {
if p.TopSize+p.TailSize < partBudget*2 {
continue // no need recalc
}
if totalBudget >= s.KeepSize {
break // do not delete everything
return // no need recalc
}
p.Budget = partBudget
halfBudget := uint32(math.Round(float64(partBudget) / 2))
Expand All @@ -432,13 +444,27 @@ func (s *BucketStat) recalc(rng *rand.Rand, b *MetricsBucket, totalBudget, partB
s.KeepSize += p.TopSize
s.KeepSize += p.TailSize
}
if s.Partition != nil {
f(s.Partition)
return
}
for _, p := range s.Partitions {
f(p)
if totalBudget >= s.KeepSize {
break // do not delete everything
}
}
}

func samplingDecisionKey(key *Key, metricInfo *format.MetricMetaValue, metricID, budgetID int32) PartitionKey {
var pk = PartitionKey{ID: metricID}
if budgetID == -1 || metricInfo == nil || len(metricInfo.FairKeyIndex) == 0 {
return pk
if budgetID == -1 {
return PartitionKey{ID: metricID}
} // no fair logic for common metrics
if metricInfo == nil || len(metricInfo.FairKeyIndex) == 0 {
return PartitionKey{}
}

var pk = PartitionKey{ID: metricID}
n := min(len(metricInfo.FairKeyIndex), maxFairKeyLen)
for i := 0; i < n; i++ {
if x := metricInfo.FairKeyIndex[i]; 0 <= x && x < len(key.Tags) {
Expand All @@ -448,10 +474,13 @@ func samplingDecisionKey(key *Key, metricInfo *format.MetricMetaValue, metricID,
return pk
}

func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget uint32, keyString string, item *MultiItem) bool {
func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget uint32, key *Key, meta *format.MetricMetaValue, keyString string, item *MultiItem, count float64, size uint32) *MultiItem {
if part.Traffic > budget && rng.Float64()*float64(part.Traffic) >= float64(budget) {
b.removeTail(part, keyString, true)
return false
return nil
}
if item == nil {
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: meta}
}
part.TailSize += item.Size
item.DupCnt++
Expand All @@ -461,7 +490,10 @@ func (b *MetricsBucket) sampleTail(rng *rand.Rand, part *BucketPartition, budget
for part.TailSize > budget && len(part.Tail) != 0 {
b.removeRandomTail(part)
}
return part.Tail[keyString] == item
if item.Size == 0 { // removeRandomTail could remove item
return nil
}
return item
}

func (b *MetricsBucket) removeRandomTail(part *BucketPartition) {
Expand Down Expand Up @@ -489,26 +521,33 @@ func (b *MetricsBucket) removeTail(part *BucketPartition, key string, fullDelete
} else {
part.KeptTraffic -= traffic
}
item.Size = 0 // optimisation to avoid map lookup
delete(b.MultiItems, key)
}
}

func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget uint32, key string, item *MultiItem, count float64) bool {
func (b *MetricsBucket) sampleTop(rng *rand.Rand, part *BucketPartition, budget uint32, key *Key, meta *format.MetricMetaValue, keyString string, item *MultiItem, count float64, size uint32) *MultiItem {
sf := 1 << part.TopSfLog2
if part.TopSfLog2 != 0 && count < float64(sf) {
if rng.Float64()*float64(sf) >= count {
return false
return nil
}
}
if item == nil {
item = &MultiItem{Key: *key, Size: size, SF: 1, GlobalSF: part.SF, Count: count, MetricMeta: meta}
}
part.TopSize += item.Size
item.DupCnt++
part.KeptTraffic += item.Size
part.Top[key] = item
b.MultiItems[key] = item
part.Top[keyString] = item
b.MultiItems[keyString] = item
for part.TopSize > budget && len(part.Top) != 0 {
part.resampleTop(rng, b, budget)
}
return part.Top[key] == item || part.Tail[key] == item // resampleTop could drop item to tail
if item.Size == 0 { // resampleTop could drop item to tail
return nil
}
return item
}

func (p *BucketPartition) resampleTop(rng *rand.Rand, b *MetricsBucket, tailBudget uint32) {
Expand All @@ -535,7 +574,7 @@ func (p *BucketPartition) resampleTop(rng *rand.Rand, b *MetricsBucket, tailBudg
p.KeptTraffic -= v.Size
}
delete(p.Top, k)
b.sampleTail(rng, p, tailBudget, k, v) // move to tail
b.sampleTail(rng, p, tailBudget, &v.Key, v.MetricMeta, k, v, v.Count, v.Size) // move to tail
if i++; i >= was/2 {
return // for remain low items
}
Expand Down
Loading