@@ -238,12 +238,12 @@ func (cs *chunkStore) Append(t int64, v interface{}) {
238
238
}
239
239
240
240
// Return current, previous, or create new chunk based on sample time
241
- func (cs * chunkStore ) chunkByTime (t int64 , isVariantEncoding bool ) * attrAppender {
241
+ func (cs * chunkStore ) chunkByTime (t int64 , isVariantEncoding bool ) ( * attrAppender , error ) {
242
242
243
243
// Sample is in the current chunk
244
244
cur := cs .chunks [cs .curChunk ]
245
245
if cur .inRange (t ) {
246
- return cur
246
+ return cur , nil
247
247
}
248
248
249
249
// Sample is in the next chunk, need to initialize
@@ -255,29 +255,32 @@ func (cs *chunkStore) chunkByTime(t int64, isVariantEncoding bool) *attrAppender
255
255
chunk := chunkenc .NewChunk (cs .logger , isVariantEncoding ) // TODO: init based on schema, use init function
256
256
app , err := chunk .Appender ()
257
257
if err != nil {
258
- return nil
258
+ return nil , err
259
+ }
260
+ nextPart , err := part .NextPart (t )
261
+ if err != nil {
262
+ return nil , err
259
263
}
260
- nextPart , _ := part .NextPart (t )
261
264
cur .initialize (nextPart , t )
262
265
cs .nextTid = t
263
266
cur .appender = app
264
267
cs .curChunk = cs .curChunk ^ 1
265
268
266
- return cur
269
+ return cur , nil
267
270
}
268
271
269
272
// If it's the first chunk after init we don't allow old updates
270
273
if (cur .state & chunkStateFirst ) != 0 {
271
- return nil
274
+ return nil , nil
272
275
}
273
276
274
277
prev := cs .chunks [cs .curChunk ^ 1 ]
275
278
// Delayed appends - only allowed to previous chunk or within allowed window
276
279
if prev .partition != nil && prev .inRange (t ) && t > cs .maxTime - maxLateArrivalInterval {
277
- return prev
280
+ return prev , nil
278
281
}
279
282
280
- return nil
283
+ return nil , nil
281
284
}
282
285
283
286
// Write all pending samples to DB chunks and aggregates
@@ -327,7 +330,6 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen
327
330
expr = expr + cs .aggrList .SetOrUpdateExpr ("v" , bucket , isNewBucket )
328
331
expr = expr + cs .appendExpression (activeChunk )
329
332
}
330
-
331
333
pendingSampleIndex ++
332
334
break
333
335
} else {
@@ -339,7 +341,11 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen
339
341
// Init activeChunk if nil (when samples are too old); if still too
340
342
// old, skip to next sample
341
343
if ! cs .isAggr () && activeChunk == nil {
342
- activeChunk = cs .chunkByTime (sampleTime , metric .isVariant )
344
+ activeChunk , err = cs .chunkByTime (sampleTime , metric .isVariant )
345
+ if err != nil {
346
+ hasPendingUpdates = false
347
+ return
348
+ }
343
349
if activeChunk == nil {
344
350
pendingSampleIndex ++
345
351
mc .logger .DebugWith ("nil active chunk" , "T" , sampleTime )
@@ -385,7 +391,11 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen
385
391
// initialize the new chunk
386
392
if activeChunk != nil && ! activeChunk .inRange (nextT ) {
387
393
expr = expr + cs .appendExpression (activeChunk )
388
- activeChunk = cs .chunkByTime (nextT , metric .isVariant )
394
+ activeChunk , err = cs .chunkByTime (nextT , metric .isVariant )
395
+ if err != nil {
396
+ hasPendingUpdates = false
397
+ return
398
+ }
389
399
}
390
400
391
401
pendingSampleIndex ++
0 commit comments