Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func MakeAgent(network string, cacheDir string, aesPwd string, trustedSubnetGrou
timeSpreadDelta: 3*commonSpread + 3*time.Second*time.Duration(i)/time.Duration(len(result.GetConfigResult.Addresses)),
BucketsToSend: make(chan compressedBucketData),
BucketsToPreprocess: make(chan *data_model.MetricsBucket, 1), // length of preprocessor queue
BucketsPool: make(chan *data_model.MetricsBucket, 2), // length of preprocessor queue + preprocess workers
rng: rnd,
CurrentTime: nowUnix,
SendTime: nowUnix - 2, // accept previous seconds at the start of the agent
Expand All @@ -267,7 +268,11 @@ func MakeAgent(network string, cacheDir string, aesPwd string, trustedSubnetGrou
shard.hardwareMetricResolutionResolved.Store(int32(config.HardwareMetricResolution))
shard.hardwareSlowMetricResolutionResolved.Store(int32(config.HardwareSlowMetricResolution))
for j := 0; j < superQueueLen; j++ {
shard.SuperQueue[j] = &data_model.MetricsBucket{} // timestamp will be assigned at queue flush
shard.SuperQueue[j] = &data_model.MetricsBucket{
MultiItemMap: data_model.MultiItemMap{MultiItems: map[string]*data_model.MultiItem{}},
CurSizes: map[int32]map[string]*data_model.BucketSizeItem{},
CurStats: map[int32]*data_model.BucketStat{},
} // timestamp will be assigned at queue flush
}
shard.cond = sync.NewCond(&shard.mu)
result.Shards = append(result.Shards, shard)
Expand Down
58 changes: 51 additions & 7 deletions internal/agent/agent_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
BucketsToSend chan compressedBucketData

BucketsToPreprocess chan *data_model.MetricsBucket
BucketsPool chan *data_model.MetricsBucket

historicBucketsToSend []compressedBucketData // Can be slightly out of order here, we sort it every time
historicBucketsDataSize int // if too many are with data, will put without data, which will be read from disk
Expand Down Expand Up @@ -180,7 +181,12 @@ func (s *Shard) ApplyUnique(key *data_model.Key, resolutionHash uint64, hashes [
s.mu.Unlock()
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
s.mu.Unlock()
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
mv.ApplyUnique(s.rng, hashes, count, hostTag)
s.mu.Unlock()
Expand Down Expand Up @@ -214,7 +220,12 @@ func (s *Shard) ApplyValues(key *data_model.Key, resolutionHash uint64, histogra
s.mu.Unlock()
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
s.mu.Unlock()
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
if s.config.LegacyApplyValues {
mv.ApplyValuesLegacy(s.rng, histogram, values, count, totalCount, hostTag, data_model.AgentPercentileCompression, metricInfo != nil && metricInfo.HasPercentiles)
Expand Down Expand Up @@ -245,7 +256,12 @@ func (s *Shard) ApplyCounter(key *data_model.Key, resolutionHash uint64, count f
s.mu.Unlock()
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
s.mu.Unlock()
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
mv.AddCounterHost(s.rng, count, hostTag)
s.mu.Unlock()
Expand Down Expand Up @@ -280,7 +296,11 @@ func (s *Shard) AddCounterHost(key *data_model.Key, resolutionHash uint64, count
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
mv.AddCounterHost(s.rng, count, hostTag)
}
Expand Down Expand Up @@ -315,7 +335,11 @@ func (s *Shard) AddCounterHostStringBytesSrcIngestionStatus(t uint32, metricInfo
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(&key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, &key, metricInfo, budgetID, budget, count, nil)
if item == nil {
return
}
mv := item.MapStringTopBytes(s.rng, s.config.StringTopCapacity, topValue, count)
mv.AddCounterHost(s.rng, count, hostTag)
}
Expand All @@ -332,7 +356,11 @@ func (s *Shard) AddValueCounterHost(key *data_model.Key, resolutionHash uint64,
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
if metricInfo != nil && metricInfo.HasPercentiles {
mv.AddValueCounterHostPercentile(s.rng, value, count, hostTag, data_model.AgentPercentileCompression)
Expand All @@ -353,7 +381,23 @@ func (s *Shard) MergeItemValue(key *data_model.Key, resolutionHash uint64, itemV
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
item, _ := resolutionShard.GetOrCreateMultiItem(key, metricInfo, nil)
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, itemValue.Count(), nil)
if item == nil {
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, itemValue.Count())
mv.Value.Merge(s.rng, itemValue)
}

func (s *Shard) GetMetricBudgetLocked(metricID int32) (int32, uint32) {
budget := s.metricBudgetsFromAgg.GetByID(metricID)
if budget > 0 {
return metricID, budget
}
budget = s.metricBudgetsFromAgg.GetByID(-1) // commonBudget
if budget > 0 {
return -1, budget
}
return -1, uint32(s.getShardSampleBudget(s.config))
}
158 changes: 77 additions & 81 deletions internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,16 @@ func (s *Shard) FlushAllDataSingleStep(sendEmpty bool) int {
if b.Empty() && !sendEmpty {
return 0
}
s.SuperQueue[sendTime%superQueueLen] = &data_model.MetricsBucket{}
select {
case newB := <-s.BucketsPool:
s.SuperQueue[sendTime%superQueueLen] = newB // must be always
default:
s.SuperQueue[sendTime%superQueueLen] = &data_model.MetricsBucket{
MultiItemMap: data_model.MultiItemMap{MultiItems: map[string]*data_model.MultiItem{}},
CurSizes: map[int32]map[string]*data_model.BucketSizeItem{},
CurStats: map[int32]*data_model.BucketStat{},
}
}
b.Time = sendTime
// we wait here only when shutting down. During normal work we check len(chan) before calling this func
s.BucketsToPreprocess <- b
Expand Down Expand Up @@ -101,23 +110,22 @@ func (s *Shard) goPreProcess(wg *sync.WaitGroup) {

var scratch []byte
var budgetScratch = map[int32]uint32{}
var sizeScratch = map[int32]uint32{}
var sfScratch = map[int32][2]float32{}
for bucket := range s.BucketsToPreprocess {
start := time.Now()
scratch = s.preProcess(bucket, scratch, budgetScratch, sizeScratch, rng)
scratch = s.preProcess(bucket, scratch, budgetScratch, sfScratch, rng)
s.agent.TimingsPreprocess.AddValueCounter(time.Since(start).Seconds(), 1)
}
log.Printf("Preprocessor quit")
}

func (s *Shard) preProcess(bucket *data_model.MetricsBucket, scratch []byte, budgetScratch, sizeScratch map[int32]uint32, rng *rand.Rand) []byte {
var buffers data_model.SamplerBuffers
func (s *Shard) preProcess(bucket *data_model.MetricsBucket, scratch []byte, budgetScratch map[int32]uint32, sfScratch map[int32][2]float32, rng *rand.Rand) []byte {
// If bucket is empty, we must still do processing and sending
// for each contributor every second.
// We generate only v3 buckets, but can have v2 on disk from previous agent version
var sb tlstatshouse.SourceBucket3

_, scratch = s.sampleBucket(bucket, &sb, buffers, scratch, budgetScratch, sizeScratch, rng)
scratch = s.sampleBucket(bucket, &sb, scratch, budgetScratch, sfScratch, rng)
// after sampling sb is sorted by metric, with ingestion status of each metric close to metric itself
scratch = sb.WriteTL1Boxed(scratch[:0])
compressed := compress.CompressAndFrame(scratch) // allocates, will live long in send queue
Expand All @@ -126,10 +134,14 @@ func (s *Shard) preProcess(bucket *data_model.MetricsBucket, scratch []byte, bud
data: compressed,
}
s.sendToSenders(cbd)
select {
case s.BucketsPool <- bucket:
default:
}
return scratch
}

func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.SourceBucket3, buffers data_model.SamplerBuffers, scratch []byte, budgetScratch, sizeScratch map[int32]uint32, rnd *rand.Rand) (data_model.SamplerBuffers, []byte) {
func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.SourceBucket3, scratch []byte, budgetScratch map[int32]uint32, sfScratch map[int32][2]float32, rnd *rand.Rand) []byte {
var sizeUnique, sizePercentiles, sizeValue, sizeSingleValue, sizeCounter, sizeStringTop [2]int

s.mu.Lock()
Expand Down Expand Up @@ -171,22 +183,37 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.

sb.Metrics = append(sb.Metrics, item)
}
clear(sfScratch)
clear(budgetScratch)
clear(sizeScratch)
s.metricBudgetsFromAgg.Get(budgetScratch)
sampler := data_model.NewSampler(data_model.SamplerConfig{
ModeAgent: s.agent.componentTag == format.TagValueIDComponentAgent,
SampleKeepSingle: config.SampleKeepSingle,
DisableNoSampleAgent: config.DisableNoSampleAgent,
SampleBudgets: config.SampleBudgets,
SampleNamespaces: config.SampleNamespaces,
SampleGroups: config.SampleGroups,
SampleKeys: config.SampleKeys,
Meta: s.agent.metricStorage,
Rand: rnd,
KeepF: func(v *data_model.MultiItem, ts uint32, _ uint32) { keepF(v, ts, 0) },
SamplerBuffers: buffers,
})
for m, stat := range bucket.CurStats {
if m == -1 {
for k, p := range stat.Partitions {
if size := p.TopSize + p.TailSize; size > 0 {
sf := sfScratch[k.ID]
sf[1] = float32(p.Traffic) / float32(size)
sfScratch[k.ID] = sf
}
}
continue
}
keptTraffic := uint32(0)
keptSize := uint32(0)
for _, p := range stat.Partitions {
if p.TopSize != 0 || p.TailSize != 0 {
keptTraffic += p.Traffic
keptSize += p.TopSize + p.TailSize
}
}
sf := sfScratch[m]
if keptTraffic > 0 {
sf[0] = float32(stat.Traffic) / float32(keptTraffic) // global sf
}
if keptSize > 0 {
sf[1] = float32(stat.Traffic) / float32(keptSize) // common sf
}
sfScratch[m] = sf
}
for _, item := range bucket.MultiItems {
if item.Key.Metric == format.BuiltinMetricIDIngestionStatus && item.Key.Tags[2] == format.TagValueIDSrcIngestionStatusOKCached {
// transfer optimization, outside budget, size tracked below in format.TagValueIDSizeSampleFactors
Expand All @@ -202,61 +229,18 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
keepF(item, bucket.Time, 1)
continue
}

whaleWeight := item.FinishStringTop(rnd, config.StringTopCountSend) // all excess items are baked into Tail
accountMetric := item.Key.Metric
sz := item.Key.TLSizeEstimate(bucket.Time) + item.TLSizeEstimate()
if item.Key.Metric == format.BuiltinMetricIDIngestionStatus {
if item.Key.Tags[1] != 0 {
// Ingestion status and other unlimited per-metric built-ins should use its metric budget
// So metrics are better isolated
accountMetric = item.Key.Tags[1]
whaleWeight = 0 // ingestion statuses do not compete for whale status
}
_ = item.FinishStringTop(rnd, config.StringTopCountSend) // all excess items are baked into Tail
if globalSF := sfScratch[item.Key.AccountMetric()]; globalSF[0] > 0 {
item.SF *= float64(globalSF[0])
}
sizeScratch[accountMetric] += uint32(sz)
sampler.Add(data_model.SamplingMultiItemPair{
Item: item,
WhaleWeight: whaleWeight,
Size: sz,
MetricID: accountMetric,
Budget: budgetScratch[accountMetric],
})
}
clear(bucket.MultiItems) // help GC by splitting bucket dependency cluster into individual items

var remainingBudget int64
if budget, ok := config.ShardSampleBudget[int(s.ShardKey)]; ok {
remainingBudget = int64(budget)
} else {
numShards := int(s.agent.shardByMetricCount)
remainingBudget = int64((config.SampleBudget + numShards - 1) / numShards)
keepF(item, bucket.Time, 0)
}
if remainingBudget > data_model.MaxUncompressedBucketSize/2 { // Algorithm is not exact
remainingBudget = data_model.MaxUncompressedBucketSize / 2
}

budgetSum := int64(0)
for _, budget := range budgetScratch {
budgetSum += int64(budget)
}
remainingBudget = max(int64(config.MinSampleBudget), remainingBudget-budgetSum)

sampler.Run(remainingBudget)

for _, v := range sampler.MetricGroups {
s.agent.MergeItemValue(bucket.Time, format.BuiltinMetricMetaSrcSamplingSizeBytes,
[]int32{0, s.agent.componentTag, format.TagValueIDSamplingDecisionKeep, v.NamespaceID, v.GroupID, v.MetricID},
&v.SumSizeKeep)
// discard bytes
s.agent.MergeItemValue(bucket.Time, format.BuiltinMetricMetaSrcSamplingSizeBytes,
[]int32{0, s.agent.componentTag, format.TagValueIDSamplingDecisionDiscard, v.NamespaceID, v.GroupID, v.MetricID},
&v.SumSizeDiscard)
// budget
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingGroupBudget,
[]int32{0, s.agent.componentTag, v.NamespaceID, v.GroupID},
v.Budget(), 1)
}
remainingBudget := max(int64(config.MinSampleBudget), s.getShardSampleBudget(config)-budgetSum)
s.metricBudgetsFromAgg.MergeMaxOne(-1, uint32(remainingBudget))

// report budget used
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingBudget,
Expand All @@ -265,7 +249,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
// metric count
s.agent.AddValueCounter(bucket.Time, format.BuiltinMetricMetaSrcSamplingMetricCount,
[]int32{0, s.agent.componentTag},
float64(sampler.MetricCount), 1)
float64(len(bucket.CurStats)), 1)

// Add size metrics
for i := 0; i != 2; i++ {
Expand All @@ -281,21 +265,19 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
s.addSizeByTypeMetric(bucket.Time, format.TagValueIDSizeStringTop, samplingTag, sizeStringTop[i])
}

for i, sf := range sampler.SampleFactors {
if size, ok := sizeScratch[sf.Metric]; ok {
sampler.SampleFactors[i].OriginalSize = size
delete(sizeScratch, sf.Metric)
for m, sizes := range bucket.CurSizes {
var size uint32
for _, sz := range sizes {
size += sz.Size
}
}
for m, size := range sizeScratch {
sampler.SampleFactors = append(sampler.SampleFactors, tlstatshouse.SampleFactor{
sb.SampleFactors = append(sb.SampleFactors, tlstatshouse.SampleFactor{
Metric: m,
Value: 0,
Value: sfScratch[m][1],
OriginalSize: size,
})
}
sb.SetHaveOriginalSize(true)
sb.SampleFactors = append(sb.SampleFactors, sampler.SampleFactors...)
bucket.Clear()

// Calculate size metrics for sample factors and ingestion status
sbSizeCalc := tlstatshouse.SourceBucket3{SampleFactors: sb.SampleFactors}
Expand All @@ -306,7 +288,21 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, sb *tlstatshouse.
scratch = sbSizeCalc.WriteTL1(scratch[:0])
s.addSizeByTypeMetric(bucket.Time, format.TagValueIDSizeIngestionStatusOK, format.TagValueIDSamplingNo, len(scratch))

return sampler.SamplerBuffers, scratch
return scratch
}

func (s *Shard) getShardSampleBudget(cfg Config) int64 {
var remainingBudget int64
if budget, ok := cfg.ShardSampleBudget[int(s.ShardKey)]; ok {
remainingBudget = int64(budget)
} else {
numShards := int(s.agent.shardByMetricCount)
remainingBudget = int64((cfg.SampleBudget + numShards - 1) / numShards)
}
if remainingBudget > data_model.MaxUncompressedBucketSize/2 { // Algorithm is not exact
remainingBudget = data_model.MaxUncompressedBucketSize / 2
}
return remainingBudget
}

func (s *Shard) sendToSenders(cbd compressedBucketData) {
Expand Down
Loading
Loading