Skip to content

Commit a492177

Browse files
committed
tags mapper after sampling
1 parent d705229 commit a492177

2 files changed

Lines changed: 76 additions & 56 deletions

File tree

internal/aggregator/aggregator_handlers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,9 +646,9 @@ func (a *Aggregator) handleSendSourceBucket(hctx *rpc.HandlerContext, args tlsta
646646
[]int32{0, format.TagValueIDComponentAggregator, format.TagValueIDMappingCacheEventMiss},
647647
float64(mappingMisses), hostTag, aera)
648648

649-
//if !configR.EnableMappingAfterSampling {
650-
a.addUnknownTags(unknownTags, aggBucket.time, hostTag, aera)
651-
//}
649+
if !configR.EnableMappingAfterSampling {
650+
a.addUnknownTags(unknownTags, aggBucket.time, hostTag, aera)
651+
}
652652

653653
a.sh2.AddValueCounterHostAERA(args.Time, format.BuiltinMetricMetaAggSizeCompressed,
654654
[]int32{0, 0, 0, 0, conveyor, spare},

internal/aggregator/aggregator_insert.go

Lines changed: 73 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -114,36 +114,31 @@ func (p *metricIndexCache) skips(metricID int32) (skipMaxHost bool, skipMinHost
114114
return false, false, false
115115
}
116116

117-
func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, top data_model.TagUnion) []byte {
118-
appendTag := func(res []byte, k *data_model.Key, i int) []byte {
119-
if k.Tags[i] != 0 {
120-
res = binary.LittleEndian.AppendUint32(res, uint32(k.Tags[i]))
117+
func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, top data_model.TagUnion,
118+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
119+
appendTag := func(res []byte, S string, I int32) []byte {
120+
if I != 0 || S == "" { // if we somehow have both I and S, we prefer I
121+
res = binary.LittleEndian.AppendUint32(res, uint32(I))
121122
res = rowbinary.AppendString(res, "")
122123
return res
123124
}
125+
processUnknownTag(S, unknownTags, bucketUnknownTags) // even if skip, to optimize agent-aggregator traffic
124126
res = binary.LittleEndian.AppendUint32(res, 0)
125-
res = rowbinary.AppendString(res, k.STags[i])
127+
res = rowbinary.AppendString(res, S)
126128
return res
127129
}
128130
res = append(res, 0) // index_type
129131
res = binary.LittleEndian.AppendUint32(res, uint32(k.Metric))
130-
// TODO write pretags
131-
_ = metricCache
132+
// TODO - if we need to write pretags in the future, look then up in metricCache
132133
res = binary.LittleEndian.AppendUint32(res, k.Timestamp)
133134
for ki := 0; ki < format.MaxTags; ki++ {
134135
if ki == format.StringTopTagIndexV3 {
135136
continue
136137
}
137-
res = appendTag(res, k, ki)
138+
res = appendTag(res, k.STags[ki], k.Tags[ki])
138139
}
139140
// write string top
140-
if top.I != 0 || len(top.S) == 0 { // if we have both I and S use prefer I (we keep S to v2 compat)
141-
res = binary.LittleEndian.AppendUint32(res, uint32(top.I))
142-
res = rowbinary.AppendString(res, "")
143-
} else {
144-
res = binary.LittleEndian.AppendUint32(res, 0)
145-
res = rowbinary.AppendString(res, top.S)
146-
}
141+
res = appendTag(res, top.S, top.I)
147142
return res
148143
}
149144

@@ -152,17 +147,19 @@ func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, to
152147
// So we can select badges for free by adding || (env < 0) to requests, then filtering result rows
153148
// Also we must select both count and sum, then process them separately for each badge kind
154149

155-
func appendMultiBadge(rng *rand.Rand, res []byte, k *data_model.Key, v *data_model.MultiItem, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}) []byte {
150+
func appendMultiBadge(rng *rand.Rand, res []byte, k *data_model.Key, v *data_model.MultiItem, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{},
151+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
156152
if k.Metric >= 0 { // fastpath
157153
return res
158154
}
159155
for _, t := range v.Top {
160-
res = appendBadge(rng, res, k, t.Value, metricCache, usedTimestamps)
156+
res = appendBadge(rng, res, k, t.Value, metricCache, usedTimestamps, unknownTags, bucketUnknownTags)
161157
}
162-
return appendBadge(rng, res, k, v.Tail.Value, metricCache, usedTimestamps)
158+
return appendBadge(rng, res, k, v.Tail.Value, metricCache, usedTimestamps, unknownTags, bucketUnknownTags)
163159
}
164160

165-
func appendBadge(rng *rand.Rand, res []byte, k *data_model.Key, v data_model.ItemValue, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{}) []byte {
161+
func appendBadge(rng *rand.Rand, res []byte, k *data_model.Key, v data_model.ItemValue, metricCache *metricIndexCache, usedTimestamps map[uint32]struct{},
162+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
166163
if k.Metric >= 0 { // fastpath
167164
return res
168165
}
@@ -187,15 +184,15 @@ func appendBadge(rng *rand.Rand, res []byte, k *data_model.Key, v data_model.Ite
187184
format.TagValueIDSrcIngestionStatusWarnTimestampClampedFutureAgg,
188185
format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture,
189186
format.TagValueIDSrcIngestionStatusWarnMapInvalidRawTagValue:
190-
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, v, metricCache)
187+
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, v, metricCache, unknownTags, bucketUnknownTags)
191188
}
192-
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, v, metricCache)
189+
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, v, metricCache, unknownTags, bucketUnknownTags)
193190
case format.BuiltinMetricIDAgentSamplingFactor:
194-
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, v, metricCache)
191+
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, v, metricCache, unknownTags, bucketUnknownTags)
195192
case format.BuiltinMetricIDAggSamplingFactor:
196-
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, v, metricCache)
193+
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, v, metricCache, unknownTags, bucketUnknownTags)
197194
case format.BuiltinMetricIDAggBucketReceiveDelaySec:
198-
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeContributors, 0}}, v, metricCache)
195+
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeContributors, 0}}, v, metricCache, unknownTags, bucketUnknownTags)
199196
}
200197
return res
201198
}
@@ -210,13 +207,14 @@ func appendAggregates(res []byte, c float64, mi float64, ma float64, su float64,
210207
return res
211208
}
212209

213-
func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_model.ItemValue, cache *metricIndexCache) []byte {
210+
func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_model.ItemValue, cache *metricIndexCache,
211+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
214212
count := v.Count()
215213
if count <= 0 { // We have lots of built-in counters which are normally 0
216214
return res
217215
}
218216
// for explanation of insert logic, see multiValueMarshal below
219-
res = appendKeys(res, key, cache, data_model.TagUnion{})
217+
res = appendKeys(res, key, cache, data_model.TagUnion{}, unknownTags, bucketUnknownTags)
220218
skipMaxHost, skipMinHost, skipSumSquare := cache.skips(key.Metric)
221219
if v.ValueSet {
222220
res = appendAggregates(res, count, v.ValueMin, v.ValueMax, v.ValueSum, zeroIfTrue(v.ValueSumSquare, skipSumSquare))
@@ -226,25 +224,41 @@ func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_mod
226224

227225
res = rowbinary.AppendEmptyCentroids(res)
228226
res = rowbinary.AppendEmptyUnique(res)
229-
return appendHosts(rng, res, count, v, skipMaxHost, skipMinHost)
227+
return appendHosts(rng, res, count, v, skipMaxHost, skipMinHost, unknownTags, bucketUnknownTags)
228+
}
229+
230+
func processUnknownTag(S string, unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) {
231+
if S == "" {
232+
return
233+
}
234+
cv, ok := bucketUnknownTags[S]
235+
if !ok { // either already moved to unknownTags or has no context (in historic bucket)
236+
return
237+
}
238+
delete(bucketUnknownTags, S)
239+
unknownTags[S] = cv
230240
}
231241

232-
func appendHosts(rng *rand.Rand, res []byte, count float64, v data_model.ItemValue, skipMaxHost bool, skipMinHost bool) []byte {
242+
func appendHosts(rng *rand.Rand, res []byte, count float64, v data_model.ItemValue, skipMaxHost bool, skipMinHost bool,
243+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
233244
// min_host
245+
processUnknownTag(v.MinHostTag.S, unknownTags, bucketUnknownTags) // even if skip, to optimize agent-aggregator traffic
234246
if v.ValueSet && !skipMinHost && !v.MinHostTag.Empty() {
235247
mi := float32(data_model.SkewMinMaxHost(rng, v.ValueMin)) // explanation is in Skew function
236248
res = appendArgMinMaxTag(res, v.MinHostTag, mi)
237249
} else {
238250
res = rowbinary.AppendArgMinMaxStringEmpty(res)
239251
}
240252
// max_host
253+
processUnknownTag(v.MaxHostTag.S, unknownTags, bucketUnknownTags) // even if skip, to optimize agent-aggregator traffic
241254
if v.ValueSet && !skipMaxHost && !v.MaxHostTag.Empty() {
242255
ma := float32(data_model.SkewMinMaxHost(rng, v.ValueMax)) // explanation is in Skew function
243256
res = appendArgMinMaxTag(res, v.MaxHostTag, ma)
244257
} else {
245258
res = rowbinary.AppendArgMinMaxStringEmpty(res)
246259
}
247260
// max_count_host
261+
processUnknownTag(v.MaxCounterHostTag.S, unknownTags, bucketUnknownTags) // even if skip, to optimize agent-aggregator traffic
248262
if !v.MaxCounterHostTag.Empty() {
249263
cc := float32(data_model.SkewMaxCounterHost(rng, count)) // explanation is in Skew function
250264
res = appendArgMinMaxTag(res, v.MaxCounterHostTag, cc)
@@ -254,24 +268,23 @@ func appendHosts(rng *rand.Rand, res []byte, count float64, v data_model.ItemVal
254268
return res
255269
}
256270

257-
func appendSimpleValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v float64, count float64, hostTag int32, metricCache *metricIndexCache) []byte {
258-
return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, data_model.TagUnion{I: hostTag}), metricCache)
271+
func appendSimpleValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v float64, count float64, hostTag int32, metricCache *metricIndexCache,
272+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
273+
return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, data_model.TagUnion{I: hostTag}), metricCache, unknownTags, bucketUnknownTags)
259274
}
260275

261-
func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, sf float64) []byte {
276+
func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, sf float64,
277+
unknownTags map[string]data_model.CreateMappingExtra, bucketUnknownTags map[string]data_model.CreateMappingExtra) []byte {
262278
skipMaxHost, skipMinHost, skipSumSquare := cache.skips(metricID)
263279
counter := value.Value.Count() * sf
264280
if value.Value.ValueSet {
265281
res = appendAggregates(res, counter, value.Value.ValueMin, value.Value.ValueMax, value.Value.ValueSum*sf, zeroIfTrue(value.Value.ValueSumSquare*sf, skipSumSquare))
266282
} else {
267-
// motivation - we set MaxValue to aggregated counter, so this value will be preserved while merging into minute or hour table
268-
// later, when selecting, we can sum them from individual shards, showing approximately counter/sec spikes
269-
// https://clickhouse.com/docs/en/engines/table-engines/special/distributed#_shard_num
270-
res = appendAggregates(res, counter, 0, counter, 0, 0)
283+
res = appendAggregates(res, counter, 0, 0, 0, 0)
271284
}
272285
res = rowbinary.AppendCentroids(res, value.ValueTDigest, sf)
273286
res = value.HLL.MarshallAppend(res)
274-
return appendHosts(rng, res, counter, value.Value, skipMaxHost, skipMinHost)
287+
return appendHosts(rng, res, counter, value.Value, skipMaxHost, skipMinHost, unknownTags, bucketUnknownTags)
275288
}
276289

277290
type insertSize struct {
@@ -316,7 +329,15 @@ type insertStats struct {
316329
func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket, buffers data_model.SamplerBuffers,
317330
rnd *rand.Rand, res []byte) ([]byte, data_model.SamplerBuffers, insertStats, time.Duration) {
318331
startTime := time.Now()
319-
recentTs := buckets[0].time // by convention first bucket is recent all others are historic
332+
a.configMu.RLock()
333+
configR := a.configR
334+
a.configMu.RUnlock()
335+
336+
recentTs := buckets[0].time // by convention first bucket is recent, all others are historic
337+
recentUnknownTags := buckets[0].unknownTags
338+
if !configR.EnableMappingAfterSampling {
339+
recentUnknownTags = nil // make [] extremely fast NOP in processUnknownTag
340+
}
320341
historicTag := int32(format.TagValueIDConveyorRecent)
321342
if len(buckets) > 1 {
322343
historicTag = format.TagValueIDConveyorHistoric
@@ -328,11 +349,6 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
328349
sampling: make(map[samplingStatKey]samplingStat),
329350
}
330351

331-
var configR ConfigAggregatorRemote
332-
a.configMu.RLock()
333-
configR = a.configR
334-
a.configMu.RUnlock()
335-
336352
addSizes := func(bucketTs uint32, is insertSize) {
337353
sizes := stats.sizes[bucketTs]
338354
sizes.counters += is.counters
@@ -347,6 +363,7 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
347363
metricCache := makeMetricCache(a.metricStorage)
348364
usedTimestamps := map[uint32]struct{}{}
349365
usedBufferTimestamps := map[uint32]struct{}{}
366+
unknownTags := map[string]data_model.CreateMappingExtra{}
350367

351368
insertItem := func(item *data_model.MultiItem, sf float64, bucketTs uint32) { // lambda is convenient here
352369
is := insertSize{}
@@ -363,8 +380,8 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
363380

364381
resPos := len(res)
365382
if !item.Tail.Empty() { // only tail
366-
res = appendKeys(res, &item.Key, metricCache, data_model.TagUnion{})
367-
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, sf)
383+
res = appendKeys(res, &item.Key, metricCache, data_model.TagUnion{}, unknownTags, recentUnknownTags)
384+
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, sf, unknownTags, recentUnknownTags)
368385

369386
if item.Key.Metric < 0 {
370387
is.builtin += len(res) - resPos
@@ -387,8 +404,8 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
387404
continue
388405
}
389406
// We have no badges for string tops
390-
res = appendKeys(res, &item.Key, metricCache, key) // TODO - insert I
391-
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, sf) // TODO - insert I
407+
res = appendKeys(res, &item.Key, metricCache, key, unknownTags, recentUnknownTags)
408+
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, sf, unknownTags, recentUnknownTags)
392409
}
393410
if item.Key.Metric < 0 {
394411
is.builtin += len(res) - resPos
@@ -409,8 +426,8 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
409426
return
410427
}
411428
key := a.aggKey(recentTs, format.BuiltinMetricIDAggSamplingFactor, [format.MaxTags]int32{0, 0, 0, 0, metricID, format.TagValueIDAggSamplingFactorReasonInsertSize})
412-
res = appendBadge(rnd, res, key, data_model.SimpleItemValue(sf, 1, a.aggregatorHostTag), metricCache, usedTimestamps)
413-
res = appendSimpleValueStat(rnd, res, key, sf, 1, a.aggregatorHostTag.I, metricCache)
429+
res = appendBadge(rnd, res, key, data_model.SimpleItemValue(sf, 1, a.aggregatorHostTag), metricCache, usedTimestamps, unknownTags, recentUnknownTags)
430+
res = appendSimpleValueStat(rnd, res, key, sf, 1, a.aggregatorHostTag.I, metricCache, unknownTags, recentUnknownTags)
414431
},
415432
KeepF: func(item *data_model.MultiItem, bucketTs uint32, _ uint32) { insertItem(item, item.SF, bucketTs) },
416433
SamplerBuffers: buffers,
@@ -424,7 +441,7 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
424441
whaleWeight := item.FinishStringTop(rnd, configR.StringTopCountInsert) // all excess items are baked into Tail
425442

426443
resPos := len(res)
427-
res = appendMultiBadge(rnd, res, &item.Key, item, metricCache, usedTimestamps)
444+
res = appendMultiBadge(rnd, res, &item.Key, item, metricCache, usedTimestamps, unknownTags, recentUnknownTags)
428445
is.builtin += len(res) - resPos
429446

430447
accountMetric := item.Key.Metric
@@ -502,15 +519,18 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
502519
insertTimeUnix := uint32(time.Now().Unix()) // same quality as timestamp from advanceBuckets, can be larger or smaller
503520
for t := range usedTimestamps {
504521
key := data_model.Key{Timestamp: insertTimeUnix, Metric: format.BuiltinMetricIDContributorsLog, Tags: [format.MaxTags]int32{0, int32(t)}}
505-
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
522+
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache, unknownTags, recentUnknownTags)
506523
key = data_model.Key{Timestamp: t, Metric: format.BuiltinMetricIDContributorsLogRev, Tags: [format.MaxTags]int32{0, int32(insertTimeUnix)}}
507-
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
524+
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache, unknownTags, recentUnknownTags)
508525
}
509526
for t := range usedBufferTimestamps {
510527
key := data_model.Key{Timestamp: insertTimeUnix, Metric: format.BuiltinMetricIDContributorsLog, Tags: [format.MaxTags]int32{0, int32(t)}}
511-
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
528+
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache, unknownTags, recentUnknownTags)
512529
key = data_model.Key{Timestamp: t, Metric: format.BuiltinMetricIDContributorsLogRev, Tags: [format.MaxTags]int32{0, int32(insertTimeUnix)}}
513-
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
530+
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache, unknownTags, recentUnknownTags)
531+
}
532+
if configR.EnableMappingAfterSampling {
533+
a.addUnknownTags(unknownTags, recentTs, a.aggregatorHostTag, data_model.AgentEnvRouteArch{}) // no agent info here
514534
}
515535
return res, sampler.SamplerBuffers, stats, time.Since(startTime)
516536
}

0 commit comments

Comments
 (0)