Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 14 additions & 22 deletions internal/aggregator/aggregator_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *metricIndexCache) skips(metricID int32) (skipMaxHost bool, skipMinHost
return false, false, false
}

func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, top data_model.TagUnion, bufferedInsert bool) []byte {
func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, top data_model.TagUnion) []byte {
appendTag := func(res []byte, k *data_model.Key, i int) []byte {
if k.Tags[i] != 0 {
res = binary.LittleEndian.AppendUint32(res, uint32(k.Tags[i]))
Expand All @@ -125,11 +125,7 @@ func appendKeys(res []byte, k *data_model.Key, metricCache *metricIndexCache, to
res = rowbinary.AppendString(res, k.STags[i])
return res
}
var it uint8
if bufferedInsert {
it = 1
}
res = append(res, it)
res = append(res, 0) // index_type
res = binary.LittleEndian.AppendUint32(res, uint32(k.Metric))
// TODO write pretags
_ = metricCache
Expand Down Expand Up @@ -191,15 +187,15 @@ func appendBadge(rng *rand.Rand, res []byte, k *data_model.Key, v data_model.Ite
format.TagValueIDSrcIngestionStatusWarnTimestampClampedFutureAgg,
format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture,
format.TagValueIDSrcIngestionStatusWarnMapInvalidRawTagValue:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, v, metricCache, false)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionWarnings, k.Tags[1]}}, v, metricCache)
}
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, v, metricCache, false)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeIngestionErrors, k.Tags[1]}}, v, metricCache)
case format.BuiltinMetricIDAgentSamplingFactor:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, v, metricCache, false)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAgentSamplingFactor, k.Tags[1]}}, v, metricCache)
case format.BuiltinMetricIDAggSamplingFactor:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, v, metricCache, false)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeAggSamplingFactor, k.Tags[4]}}, v, metricCache)
case format.BuiltinMetricIDAggBucketReceiveDelaySec:
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeContributors, 0}}, v, metricCache, false)
return appendValueStat(rng, res, &data_model.Key{Timestamp: ts, Metric: format.BuiltinMetricIDBadges, Tags: [format.MaxTags]int32{0, format.TagValueIDBadgeContributors, 0}}, v, metricCache)
}
return res
}
Expand All @@ -214,13 +210,13 @@ func appendAggregates(res []byte, c float64, mi float64, ma float64, su float64,
return res
}

func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_model.ItemValue, cache *metricIndexCache, bufferedInsert bool) []byte {
func appendValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v data_model.ItemValue, cache *metricIndexCache) []byte {
count := v.Count()
if count <= 0 { // We have lots of built-in counters which are normally 0
return res
}
// for explanation of insert logic, see multiValueMarshal below
res = appendKeys(res, key, cache, data_model.TagUnion{}, bufferedInsert)
res = appendKeys(res, key, cache, data_model.TagUnion{})
skipMaxHost, skipMinHost, skipSumSquare := cache.skips(key.Metric)
if v.ValueSet {
res = appendAggregates(res, count, v.ValueMin, v.ValueMax, v.ValueSum, zeroIfTrue(v.ValueSumSquare, skipSumSquare))
Expand Down Expand Up @@ -259,11 +255,7 @@ func appendHosts(rng *rand.Rand, res []byte, count float64, v data_model.ItemVal
}

func appendSimpleValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v float64, count float64, hostTag int32, metricCache *metricIndexCache) []byte {
return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, data_model.TagUnion{I: hostTag}), metricCache, false)
}

func appendBufferedValueStat(rng *rand.Rand, res []byte, key *data_model.Key, v float64, count float64, hostTag int32, metricCache *metricIndexCache) []byte {
return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, data_model.TagUnion{I: hostTag}), metricCache, true)
return appendValueStat(rng, res, key, data_model.SimpleItemValue(v, count, data_model.TagUnion{I: hostTag}), metricCache)
}

func multiValueMarshal(rng *rand.Rand, metricID int32, cache *metricIndexCache, res []byte, value *data_model.MultiValue, sf float64) []byte {
Expand Down Expand Up @@ -371,7 +363,7 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,

resPos := len(res)
if !item.Tail.Empty() { // only tail
res = appendKeys(res, &item.Key, metricCache, data_model.TagUnion{}, bufferedInsert)
res = appendKeys(res, &item.Key, metricCache, data_model.TagUnion{})
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, &item.Tail, sf)

if item.Key.Metric < 0 {
Expand All @@ -395,7 +387,7 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
continue
}
// We have no badges for string tops
res = appendKeys(res, &item.Key, metricCache, key, bufferedInsert) // TODO - insert I
res = appendKeys(res, &item.Key, metricCache, key) // TODO - insert I
res = multiValueMarshal(rnd, item.Key.Metric, metricCache, res, value, sf) // TODO - insert I
}
if item.Key.Metric < 0 {
Expand Down Expand Up @@ -516,9 +508,9 @@ func (a *Aggregator) rowDataMarshalAppendPositions(buckets []*aggregatorBucket,
}
for t := range usedBufferTimestamps {
key := data_model.Key{Timestamp: insertTimeUnix, Metric: format.BuiltinMetricIDContributorsLog, Tags: [format.MaxTags]int32{0, int32(t)}}
res = appendBufferedValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
key = data_model.Key{Timestamp: t, Metric: format.BuiltinMetricIDContributorsLogRev, Tags: [format.MaxTags]int32{0, int32(insertTimeUnix)}}
res = appendBufferedValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
res = appendSimpleValueStat(rnd, res, &key, float64(insertTimeUnix)-float64(t), 1, a.aggregatorHostTag.I, metricCache)
}
return res, sampler.SamplerBuffers, stats, time.Since(startTime)
}
Expand Down