-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathagent_shard.go
More file actions
403 lines (370 loc) · 15.9 KB
/
Copy pathagent_shard.go
File metadata and controls
403 lines (370 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Copyright 2025 V Kontakte LLC
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
package agent
import (
"sync"
"time"
"go.uber.org/atomic"
"pgregory.net/rand"
"github.com/VKCOM/statshouse/internal/data_model"
"github.com/VKCOM/statshouse/internal/format"
)
// We start sending at the end of the minute, plus we need to spread metric around the next 60 seconds,
// so 120 slots are actually actively used, while 8 slots serve as a send/receive queue
const superQueueLen = 128
// We also want to allow incoming event timestamp > s.CurrentTime at least for couple seconds, because
// our s.CurrentTime can lag behind, while client already updated clock and sent an event.
// So we reserve some slots for that.
const superQueueFutureSlots = 3
type (
// Shard gets data after initial hashing and shard number
Shard struct {
// Never change, so do not require protection
agent *Agent
ShardNum int
ShardKey int32
rng *rand.Rand
mu sync.Mutex
config Config // can change if remotely updated
hardwareMetricResolutionResolved atomic.Int32 // copy from config to avoid lock in fast path
hardwareSlowMetricResolutionResolved atomic.Int32 // copy from config to avoid lock in fast path
timeSpreadDelta time.Duration // randomly spread bucket sending through second between sources/machines
CurrentTime uint32
SendTime uint32
SuperQueue [superQueueLen]*data_model.MetricsBucket
// CurrentTime advances with the clock.
// SendTime follows with some delay, but can lag behind if sampling conveyor is stuck.
// If CurrentTime is too far in the future relative to SendTime, agent discards all received events
// beware!
// We must spread X second resolution metric rows around next X seconds deterministically,
// all agents must assign the same rows to the same second, so that when aggregator
// works on that second, all those rows aggregate together.
stopReceivingIncomingData bool
// We have lots of async components keeping writing metrics into agent during shutdown.
// We set this bool as a circuit breaker, so new data will not be added to CurrentBuckets/NextBuckets
// And shutdown code can flush them to disk without any non-deterministic behavior
BucketsToSend chan compressedBucketData
BucketsToPreprocess chan *data_model.MetricsBucket
BucketsPool chan *data_model.MetricsBucket
historicBucketsToSend []compressedBucketData // Can be slightly out of order here, we sort it every time
historicBucketsDataSize int // if too many are with data, will put without data, which will be read from disk
cond *sync.Cond
HistoricOutOfWindowDropped atomic.Int64
// budget for metrics from SendSourceBucket3Response
metricBudgetsFromAgg data_model.ExpDecay
}
BuiltInItemValue struct {
mu sync.Mutex
key data_model.Key
value data_model.ItemValue
metricInfo *format.MetricMetaValue
}
compressedBucketData struct {
id int64 // in disk queue, or 0 if working without disk
time uint32
data []byte // first 4 bytes are uncompressed size, rest is compressed data
}
)
func (s *Shard) HistoricBucketsDataSizeMemory() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.historicBucketsDataSize
}
func (s *Shard) gapInReceivingQueueLocked() int64 {
return int64(s.CurrentTime) - (int64(s.SendTime) + (superQueueLen - superQueueFutureSlots) - 120)
}
func (s *Shard) shouldDiscardIncomingData() bool {
return s.stopReceivingIncomingData || s.gapInReceivingQueueLocked() > 0
}
func (s *Shard) HistoricBucketsDataSizeDisk() (total int64, unsent int64) {
if s.agent.diskBucketCache == nil {
return 0, 0
}
return s.agent.diskBucketCache.TotalFileSize(s.ShardNum)
}
// For low-resolution metrics, we must ensure timestamps are rounded, so they again end up in the same map item,
// and clients should set timestamps freely and not make assumptions on metric resolution (it can be changed on the fly).
// Later, when sending bucket, we will remove timestamps for all items which have it
// equal to bucket timestamp (only for transport efficiency), then reset timestamps on aggregator after receiving.
func (s *Shard) resolutionShardFromHashLocked(key *data_model.Key, resolutionHash uint64, metricInfo *format.MetricMetaValue) (_ *data_model.MetricsBucket, clampedFuture bool) {
resolution := uint32(1)
if metricInfo != nil {
if !format.HardwareMetric(metricInfo.MetricID) {
resolution = uint32(metricInfo.EffectiveResolution)
} else {
if metricInfo.IsHardwareSlowMetric {
resolution = uint32(s.hardwareSlowMetricResolutionResolved.Load())
} else {
resolution = uint32(s.hardwareMetricResolutionResolved.Load())
}
}
}
currentTime := s.CurrentTime
sendTime := s.SendTime
if key.Timestamp == 0 {
// We have lots of builtin metrics in aggregator which should correspond to "current" second.
// Also, we have some ingestion statuses, which corresponds to current time.
// We try to provide explicit timestamp everywhere this is possible, just not everywhere.
key.Timestamp = currentTime
}
if key.Timestamp > currentTime+superQueueFutureSlots {
// we must clamp before comparing with dropIfBeforeTimestamp,
// otherwise aggregator will clamp, moving event behind timestamp it should not be written before.
// also, we must not generate events with Timestamp > bucket.Time, so future slots and
// super queue length depend on each other.
key.Timestamp = currentTime + superQueueFutureSlots
clampedFuture = true
}
// Timestamp will be clamped by aggregators.
if resolution == 1 {
slot := key.Timestamp
if slot < sendTime {
slot = sendTime // if late, send immediately, not in ~120 seconds. Helps those who are late a bit.
}
// if slot >= currentTime - we do no special processing for slots in the future
return s.SuperQueue[slot%superQueueLen], clampedFuture
}
// division is expensive, hence separate code for very common 1-second resolution above
key.Timestamp = (key.Timestamp / resolution) * resolution
resolutionShardNum := uint32((resolutionHash & 0xFFFFFFFF) * uint64(resolution) >> 32) // trunc([0..0.9999999] * numShards) in fixed point 32.32
slot := key.Timestamp + resolution + resolutionShardNum
// we could start sending 1 second earlier, adding - 1 to the slot in code above, but for now we want compatibility with legacy code.
if slot < sendTime { // rare?
slot += ((sendTime - slot + resolution - 1) / resolution) * resolution
// if late, send immediately, but keep slots aligned with resolution, sometimes identically on several/many agents, hopefully improving aggregation
}
// if slot >= currentTime+? - we do no special processing for slots in the future
return s.SuperQueue[slot%superQueueLen], clampedFuture
}
func (s *Shard) ApplyUnique(key *data_model.Key, resolutionHash uint64, hashes []int64, count float64, hostTag data_model.TagUnion,
metricInfo *format.MetricMetaValue, dropIfBeforeTimestamp uint32) {
if count == 0 {
count = float64(len(hashes))
}
if count <= 0 {
return
}
s.mu.Lock()
if s.shouldDiscardIncomingData() {
s.mu.Unlock()
return
}
topValue := key.RemoveStringTopTag()
resolutionShard, clampedFuture := s.resolutionShardFromHashLocked(key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
s.mu.Unlock()
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
s.mu.Unlock()
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
mv.ApplyUnique(s.rng, hashes, count, hostTag)
s.mu.Unlock()
if clampedFuture { // we must use key.Timestamp because this is the bucket clamped event sits in
s.AddCounterHostSrcIngestionStatus(key.Timestamp, format.BuiltinMetricMetaIngestionStatus,
[]int32{key.Tags[0], key.Metric, format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture},
1, dropIfBeforeTimestamp)
}
}
func (s *Shard) ApplyValues(key *data_model.Key, resolutionHash uint64, histogram [][2]float64, values []float64, count float64, hostTag data_model.TagUnion,
metricInfo *format.MetricMetaValue, dropIfBeforeTimestamp uint32) {
totalCount := float64(len(values))
for _, kv := range histogram {
totalCount += kv[1] // all counts are validated to be >= 0
}
if count == 0 {
count = totalCount
}
if count <= 0 {
return
}
s.mu.Lock()
if s.shouldDiscardIncomingData() {
s.mu.Unlock()
return
}
topValue := key.RemoveStringTopTag()
resolutionShard, clampedFuture := s.resolutionShardFromHashLocked(key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
s.mu.Unlock()
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
s.mu.Unlock()
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
if s.config.LegacyApplyValues {
mv.ApplyValuesLegacy(s.rng, histogram, values, count, totalCount, hostTag, data_model.AgentPercentileCompression, metricInfo != nil && metricInfo.HasPercentiles)
} else {
mv.ApplyValues(s.rng, histogram, values, count, totalCount, hostTag, data_model.AgentPercentileCompression, metricInfo != nil && metricInfo.HasPercentiles)
}
s.mu.Unlock()
if clampedFuture { // we must use key.Timestamp because this is the bucket clamped event sits in
s.AddCounterHostSrcIngestionStatus(key.Timestamp, format.BuiltinMetricMetaIngestionStatus,
[]int32{key.Tags[0], key.Metric, format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture},
1, dropIfBeforeTimestamp)
}
}
func (s *Shard) ApplyCounter(key *data_model.Key, resolutionHash uint64, count float64, hostTag data_model.TagUnion,
metricInfo *format.MetricMetaValue, dropIfBeforeTimestamp uint32) {
if count <= 0 {
return
}
s.mu.Lock()
if s.shouldDiscardIncomingData() {
s.mu.Unlock()
return
}
topValue := key.RemoveStringTopTag()
resolutionShard, clampedFuture := s.resolutionShardFromHashLocked(key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
s.mu.Unlock()
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
s.mu.Unlock()
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
mv.AddCounterHost(s.rng, count, hostTag)
s.mu.Unlock()
if clampedFuture { // we must use key.Timestamp because this is the bucket clamped event sits in
s.AddCounterHostSrcIngestionStatus(key.Timestamp, format.BuiltinMetricMetaIngestionStatus,
[]int32{key.Tags[0], key.Metric, format.TagValueIDSrcIngestionStatusWarnTimestampClampedFuture},
1, dropIfBeforeTimestamp)
}
}
func (s *Shard) AddCounterHostSrcIngestionStatus(t uint32, metricInfo *format.MetricMetaValue, tags []int32, count float64,
dropIfBeforeTimestamp uint32) {
if count <= 0 {
return
}
key := data_model.Key{Timestamp: t, Metric: metricInfo.MetricID} // panics if metricInfo nil
copy(key.Tags[:], tags)
// resolutionHash will be 0 for built-in metrics, we are OK with this
s.AddCounterHost(&key, 0, count, data_model.TagUnion{}, metricInfo, dropIfBeforeTimestamp)
}
func (s *Shard) AddCounterHost(key *data_model.Key, resolutionHash uint64, count float64, hostTag data_model.TagUnion,
metricInfo *format.MetricMetaValue, dropIfBeforeTimestamp uint32) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
return
}
topValue := key.RemoveStringTopTag()
resolutionShard, _ := s.resolutionShardFromHashLocked(key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
mv.AddCounterHost(s.rng, count, hostTag)
}
// We do not want to make allocation for every ingestion status string value, so we do not call
// AddCounterHost with string top tag set to string(str), but keep this function for now
func (s *Shard) AddCounterHostStringBytesSrcIngestionStatus(t uint32, metricInfo *format.MetricMetaValue, tags []int32, str []byte, count float64,
dropIfBeforeTimestamp uint32) {
if count <= 0 {
return
}
key := data_model.Key{Timestamp: t, Metric: metricInfo.MetricID} // panics if metricInfo nil
copy(key.Tags[:], tags)
var topValue data_model.TagUnionBytes
if len(str) != 0 {
if tag, ok := s.agent.mappingsCache.GetValueBytes(t, str); ok {
topValue.I = tag
} else {
topValue.S = str
}
}
hostTag := data_model.TagUnion{}
var resolutionHash uint64 // resolutionHash will be 0 for built-in metrics, we are OK with this
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
return
}
_ = key.RemoveStringTopTag() // for correctness
resolutionShard, _ := s.resolutionShardFromHashLocked(&key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, &key, metricInfo, budgetID, budget, count, nil)
if item == nil {
return
}
mv := item.MapStringTopBytes(s.rng, s.config.StringTopCapacity, topValue, count)
mv.AddCounterHost(s.rng, count, hostTag)
}
func (s *Shard) AddValueCounterHost(key *data_model.Key, resolutionHash uint64, value float64, count float64, hostTag data_model.TagUnion,
metricInfo *format.MetricMetaValue, dropIfBeforeTimestamp uint32) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
return
}
topValue := key.RemoveStringTopTag()
resolutionShard, _ := s.resolutionShardFromHashLocked(key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, count, nil)
if item == nil {
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, count)
if metricInfo != nil && metricInfo.HasPercentiles {
mv.AddValueCounterHostPercentile(s.rng, value, count, hostTag, data_model.AgentPercentileCompression)
} else {
mv.AddValueCounterHost(s.rng, value, count, hostTag)
}
}
func (s *Shard) MergeItemValue(key *data_model.Key, resolutionHash uint64, itemValue *data_model.ItemValue,
metricInfo *format.MetricMetaValue, dropIfBeforeTimestamp uint32) {
s.mu.Lock()
defer s.mu.Unlock()
if s.shouldDiscardIncomingData() {
return
}
topValue := key.RemoveStringTopTag()
resolutionShard, _ := s.resolutionShardFromHashLocked(key, resolutionHash, metricInfo)
if key.Timestamp < dropIfBeforeTimestamp { // key timestamp is only valid at this point
return
}
budgetID, budget := s.GetMetricBudgetLocked(key.AccountMetric())
item, _ := resolutionShard.SampleOrCreateMultiItem(s.rng, key, metricInfo, budgetID, budget, itemValue.Count(), nil)
if item == nil {
return
}
mv := item.MapStringTop(s.rng, s.config.StringTopCapacity, topValue, itemValue.Count())
mv.Value.Merge(s.rng, itemValue)
}
func (s *Shard) GetMetricBudgetLocked(metricID int32) (int32, uint32) {
budget := s.metricBudgetsFromAgg.GetByID(metricID)
if budget > 0 {
return metricID, budget
}
budget = s.metricBudgetsFromAgg.GetByID(-1) // commonBudget
if budget > 0 {
return -1, budget
}
return -1, uint32(s.getShardSampleBudget(s.config))
}