Skip to content

Commit 55e0470

Browse files
Fix flush during shutdown (#3306)
1 parent 408f099 commit 55e0470

File tree

1 file changed

+20
-11
lines changed

1 file changed

+20
-11
lines changed

modules/ingester/flush.go

+20-11
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,19 @@ func (i *Ingester) handleFlush(ctx context.Context, userID string, blockID uuid.
340340
return false, nil
341341
}
342342

343+
func (i *Ingester) enqueueExec(op *flushOp) {
344+
// Check if shutdown initiated
345+
if i.flushQueues.IsStopped() {
346+
handleAbandonedOp(op)
347+
return
348+
}
349+
350+
err := i.flushQueues.Enqueue(op)
351+
if err != nil {
352+
handleFailedOp(op, err)
353+
}
354+
}
355+
343356
func (i *Ingester) enqueue(op *flushOp, jitter bool) {
344357
delay := time.Duration(0)
345358

@@ -349,19 +362,15 @@ func (i *Ingester) enqueue(op *flushOp, jitter bool) {
349362

350363
op.at = time.Now().Add(delay)
351364

365+
if !jitter {
366+
// Execute synchronously to make sure we can flush during shutdown
367+
i.enqueueExec(op)
368+
return
369+
}
370+
352371
go func() {
353372
time.Sleep(delay)
354-
355-
// Check if shutdown initiated
356-
if i.flushQueues.IsStopped() {
357-
handleAbandonedOp(op)
358-
return
359-
}
360-
361-
err := i.flushQueues.Enqueue(op)
362-
if err != nil {
363-
handleFailedOp(op, err)
364-
}
373+
i.enqueueExec(op)
365374
}()
366375
}
367376

0 commit comments

Comments
 (0)