Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
baa2692
pass inflight requests down to the queue for the dispatcher loop exit…
monoxane Oct 29, 2025
7c619dc
another wip a few weeks later where the tests dont pass but the code …
monoxane Nov 21, 2025
ad08a63
tests work but goleak fails now
monoxane Nov 24, 2025
b173d86
move context timeout logic to queue where it should be
monoxane Nov 25, 2025
cb3a483
karl's notes
monoxane Nov 26, 2025
d137aba
all tests pass now
monoxane Nov 26, 2025
2278e36
lint fix
monoxane Nov 26, 2025
689f2f2
Merge branch 'main' into monoxane/12605-scheduler-inflight-queries
monoxane Nov 26, 2025
892c700
chore(changelog): add changelog entry for this change
monoxane Nov 26, 2025
adcaba8
chore(changelog): correct reference to PR not issue
monoxane Nov 26, 2025
3edeb5e
fix(scheduler): swap map len for an atomic int updated by inflightReq…
monoxane Nov 26, 2025
82d8bad
tests(scheduler): add tests to ensure queue drains on shutdown, and t…
monoxane Nov 27, 2025
1940f88
feat(scheduler): add query-scheduler.graceful-shutdown-timeout config…
monoxane Nov 27, 2025
3afbdba
Revert "karl's notes"
monoxane Nov 27, 2025
443cd0b
chore(docs): regenerate docs for scheduler shutdown timeout
monoxane Nov 27, 2025
316b9e5
chore(help): regenerate reference help
monoxane Nov 27, 2025
9a306b3
tests(scheduler): improve test coverage for graceful shutdown behavior
monoxane Nov 28, 2025
d0a77a9
chore(scheduler): comments and formatting
monoxane Nov 28, 2025
25008a3
Merge branch 'main' into monoxane/12605-scheduler-inflight-queries
monoxane Nov 28, 2025
f3dca4f
fix(scheduler): only allow stopCompleted to be closed once through a …
monoxane Nov 28, 2025
978a257
fix(scheduler): inc/dec inflight request counter at point of modifica…
monoxane Nov 28, 2025
76d5e0e
fix(scheduler): handle a potentially negative inflight request counte…
monoxane Nov 28, 2025
d759d69
fix(scheduler): increase timeout to 2m15s to give headroom above quer…
monoxane Dec 9, 2025
0e41e78
fix(scheduler/queue): improve dispatcher loop re: dimitar's pr comments
monoxane Dec 9, 2025
20291d7
fix(scheduler): reinstate running check after frontend receive
monoxane Dec 9, 2025
bc801ab
docs(scheduler): regenerate docs
monoxane Dec 9, 2025
02617b9
fix(scheduler): return consistent frontend error for shutting down
monoxane Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
* [ENHANCEMENT] Querier: Add optional per-tenant max limits for label name and label value requests, `max_label_names_limit` and `max_label_values_limit`. #13654
* [ENHANCEMENT] Usage tracker: `loadSnapshot()` checks shard emptiness instead of using explicit `first` parameter. #13534
* [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13603
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks unrelated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's from when I merged main into the branch.

* [ENHANCEMENT] Query-scheduler: Gracefully handle shutdown by draining queue before exiting. #12605
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525
* [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -19175,6 +19175,16 @@
"fieldDefaultValue": 0,
"fieldFlag": "query-scheduler.max-used-instances",
"fieldType": "int"
},
{
"kind": "field",
"name": "graceful_shutdown_timeout",
"required": false,
"desc": "Maximum time that the scheduler will wait for the queue to be drained on shutdown.",
"fieldValue": null,
"fieldDefaultValue": 30000000000,
"fieldFlag": "query-scheduler.graceful-shutdown-timeout",
"fieldType": "duration"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2521,6 +2521,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Set to true to use the zero-allocation response decoder for active series queries.
-query-frontend.use-mimir-query-engine-for-sharding
[experimental] Set to true to enable performing query sharding inside the Mimir query engine (MQE). This setting has no effect if sharding is disabled. Requires remote execution and MQE to be enabled.
-query-scheduler.graceful-shutdown-timeout duration
Maximum time that the scheduler will wait for the queue to be drained on shutdown. (default 30s)
-query-scheduler.grpc-client-config.backoff-max-period duration
Maximum delay when backing off. (default 10s)
-query-scheduler.grpc-client-config.backoff-min-period duration
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ Usage of ./cmd/mimir/mimir:
The socket read/write timeout. (default 200ms)
-query-frontend.scheduler-address string
Address of the query-scheduler component, in host:port format. The host should resolve to all query-scheduler instances. This option should be set only when query-scheduler component is in use and -query-scheduler.service-discovery-mode is set to 'dns'.
-query-scheduler.graceful-shutdown-timeout duration
Maximum time that the scheduler will wait for the queue to be drained on shutdown. (default 30s)
-query-scheduler.max-outstanding-requests-per-tenant int
Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100)
-query-scheduler.max-used-instances int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2139,6 +2139,11 @@ ring:
# available query-scheduler instances.
# CLI flag: -query-scheduler.max-used-instances
[max_used_instances: <int> | default = 0]

# Maximum time that the scheduler will wait for the queue to be drained on
# shutdown.
# CLI flag: -query-scheduler.graceful-shutdown-timeout
[graceful_shutdown_timeout: <duration> | default = 30s]
```

### ruler
Expand Down
1 change: 1 addition & 0 deletions operations/mimir/mimir-flags-defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,7 @@
"query-scheduler.ring.instance-port": 0,
"query-scheduler.ring.instance-addr": "",
"query-scheduler.max-used-instances": 0,
"query-scheduler.graceful-shutdown-timeout": 30000000000,
"usage-stats.enabled": true,
"usage-stats.installation-mode": "custom",
"overrides-exporter.ring.store": "memberlist",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/scheduler/queue/tree"
)
Expand Down Expand Up @@ -423,7 +425,9 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
atomic.NewInt64(0),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
queueStopTimeout,
)
require.NoError(t, err)

Expand All @@ -439,6 +443,13 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
ctx := context.Background()
require.NoError(t, queue.starting(ctx))

t.Cleanup(func() {
// if the test has failed and the queue does not get cleared,
// we must send a shutdown signal for the remaining connected querier
// or else StopAndAwaitTerminated will never complete.
assert.NoError(t, services.StopAndAwaitTerminated(ctx, queue))
})

// configure queue producers to enqueue requests with the query component
// randomly assigned according to the distribution defined in the test case
queueDimensionFunc := makeWeightedRandAdditionalQueueDimensionFunc(
Expand Down
88 changes: 69 additions & 19 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"container/list"
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -117,8 +118,11 @@ type RequestQueue struct {
discardedRequests *prometheus.CounterVec // per user
enqueueDuration prometheus.Histogram

stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request.
stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped.
stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request.
stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped.
closeStopCompleted func()
isStopping *atomic.Bool
stopTimeout time.Duration

requestsToEnqueue chan requestToEnqueue
requestsSent chan *SchedulerRequest
Expand All @@ -127,6 +131,7 @@ type RequestQueue struct {
waitingDequeueRequests chan *QuerierWorkerDequeueRequest
waitingDequeueRequestsToDispatch *list.List
waitingDequeueRequestsToDispatchCount *atomic.Int64
schedulerInflightRequests *atomic.Int64

// QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier
// to the time are completed by the querier or failed due to cancel, timeout, or disconnect.
Expand Down Expand Up @@ -219,7 +224,9 @@ func NewRequestQueue(
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
enqueueDuration prometheus.Histogram,
schedulerInflightRequests *atomic.Int64,
querierInflightRequestsMetric *prometheus.SummaryVec,
stopTimeout time.Duration,
) (*RequestQueue, error) {
queryComponentCapacity, err := NewQueryComponentUtilization(querierInflightRequestsMetric)
if err != nil {
Expand All @@ -241,6 +248,8 @@ func NewRequestQueue(
// channels must not be buffered so that we can detect when dispatcherLoop() has finished.
stopRequested: make(chan struct{}),
stopCompleted: make(chan struct{}),
isStopping: atomic.NewBool(false),
stopTimeout: stopTimeout,

requestsToEnqueue: make(chan requestToEnqueue),
requestsSent: make(chan *SchedulerRequest),
Expand All @@ -249,11 +258,16 @@ func NewRequestQueue(
waitingDequeueRequests: make(chan *QuerierWorkerDequeueRequest),
waitingDequeueRequestsToDispatch: list.New(),
waitingDequeueRequestsToDispatchCount: atomic.NewInt64(0),
schedulerInflightRequests: schedulerInflightRequests,

QueryComponentUtilization: queryComponentCapacity,
queueBroker: newQueueBroker(maxOutstandingPerTenant, forgetDelay),
}

q.closeStopCompleted = sync.OnceFunc(func() {
close(q.stopCompleted)
})

q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue")

return q, nil
Expand Down Expand Up @@ -293,15 +307,16 @@ func (q *RequestQueue) running(ctx context.Context) error {
}

func (q *RequestQueue) dispatcherLoop() {
stopping := false

for {
needToDispatchQueries := false

select {
case <-q.stopRequested:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm wondering if we shouldn't check for stopRequested and stopCompleted in an independent select? otherwise there's no guarantee that our code will enter those branches as long as there are some rquestsToEnqueue and some waitingDequeueRequests

something like

select {
case <-stopRequested:
case <-stopCompleted:
default:
  select {
  case querierWorkerOp := <-q.querierWorkerOperations: 
  // ...
  }
}

// Nothing much to do here - fall through to the stop logic below to see if we can stop immediately.
stopping = true
q.isStopping.Store(true)
case <-q.stopCompleted:
// We still check if q.stopCompleted is closed or else we'd be leaving this goroutine running in a timeout exit condition
return
case querierWorkerOp := <-q.querierWorkerOperations:
// Need to attempt to dispatch queries only if querier-worker operation results in a resharding
needToDispatchQueries = q.processQuerierWorkerOperation(querierWorkerOp)
Expand Down Expand Up @@ -336,30 +351,46 @@ func (q *RequestQueue) dispatcherLoop() {
}
}

// if we have received a signal to stop, we continue to dispatch queries until
// the queue is empty or until we have no more connected querier workers.
if stopping && (q.queueBroker.isEmpty() || q.connectedQuerierWorkers.Load() == 0) {
// tell any waiting querier connections that nothing is coming
// If the queue is stopping and theres no connected query workers,
// we exit immediately because there is no way for (any) remaining queries to be processed
if q.isStopping.Load() && q.connectedQuerierWorkers.Load() == 0 {
if q.queueBroker.itemCount() == 0 && q.schedulerInflightRequests.Load() == 0 {
// We are done.
level.Info(q.log).Log("msg", "queue stop completed: query queue is empty and all workers have been disconnected")

q.closeStopCompleted()
return
}

level.Warn(q.log).Log(
"msg", "queue stop requested but query queue is not empty, waiting for query workers to complete remaining requests",
"queueBroker_count", q.queueBroker.itemCount(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick that log lines should be easy and obvious to read even if you're not seeing the code. So queueBroker_count -> queued_requests

but also isn't schedulerInflightRequests.Load() the same as queueBroker.itemCount()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler inflight are the ones currently being handled by queriers that have been dequeued but not completed, we have to track them separately to ensure we don't cancel their contexts by closing the connections to the queriers before they're done.

"scheduler_inflight", q.schedulerInflightRequests.Load(),
)
}

// If the queue is stopping and theres no requests in the queue we cancel any remaining dequeue requests,
// which stops the query workers and exit the service
if q.isStopping.Load() && q.schedulerInflightRequests.Load() == 0 && q.queueBroker.itemCount() == 0 {
level.Info(q.log).Log("msg", "queue stop requested and all pending requests have been processed, disconnecting queriers")

currentElement := q.waitingDequeueRequestsToDispatch.Front()

for currentElement != nil {
waitingDequeueReq := currentElement.Value.(*QuerierWorkerDequeueRequest)
waitingDequeueReq.sendError(ErrStopped)

level.Debug(q.log).Log("msg", "cancelled dequeue request", "querier_id", waitingDequeueReq.QuerierID, "worker_id", waitingDequeueReq.WorkerID)

nextElement := currentElement.Next()
q.waitingDequeueRequestsToDispatchCount.Dec()
q.waitingDequeueRequestsToDispatch.Remove(currentElement)
currentElement = nextElement
}

if !q.queueBroker.isEmpty() {
// All queriers have disconnected, but we still have requests in the queue.
// Without any consumers we have nothing to do but stop the RequestQueue.
// This should never happen, but if this does happen, we want to know about it.
level.Warn(q.log).Log("msg", "shutting down dispatcher loop: have no connected querier workers, but request queue is not empty, so these requests will be abandoned")
}

// We are done.
close(q.stopCompleted)
level.Info(q.log).Log("msg", "queue stop completed: all query dequeue requests closed")
q.closeStopCompleted()
return
}
}
Expand Down Expand Up @@ -485,6 +516,8 @@ func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRe
return reqForQuerier.queryRequest, reqForQuerier.lastTenantIndex, reqForQuerier.err
case <-dequeueReq.ctx.Done():
return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err()
case <-q.stopCompleted:
return nil, dequeueReq.lastTenantIndex, ErrStopped
}
case <-dequeueReq.ctx.Done():
return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err()
Expand All @@ -499,9 +532,26 @@ func (q *RequestQueue) stop(_ error) error {
// Reads from stopRequested tell dispatcherLoop to enter a stopping state where it tries to clear the queue.
// The loop needs to keep executing other select branches while stopping in order to clear the queue.
q.stopRequested <- struct{}{}
<-q.stopCompleted

return nil
// We set a context with a timeout to ensure that in the event the scheduler is the last component
// running we do not hang forever waiting for the queue to be drained.
// This timeout is set using -query-scheduler.graceful-shutdown-timeout and defaults to 30 seconds
ctx, cancel := context.WithTimeout(context.Background(), q.stopTimeout)

for {
select {
case <-q.stopCompleted:
cancel()
return nil
case <-ctx.Done():
level.Warn(q.log).Log("msg", "queue stop timeout reached: query queue is not empty but queries have not been handled before the timeout")

cancel()
q.closeStopCompleted()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we close here and return nil aren't we risking a non-clean shutdown? I mean dispatcherLoop() is still running in the background, there's no guarantee that it observed the close(stopCompleted)

can't you incorporate the timeout inside the dispatcherLoop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explicit reason for closing the channel is to signal dispatcherLoop to exit, I've tested it with goleak and it seems to work as expected leaving nothing behind.


return nil
}
}
}

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/queue/queue_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (qb *queueBroker) isEmpty() bool {
return qb.tree.IsEmpty()
}

func (qb *queueBroker) itemCount() int {
return qb.tree.ItemCount()
}

// enqueueRequestBack is the standard interface to enqueue requests for dispatch to queriers.
//
// Tenants and tenant-querier shuffle sharding relationships are managed internally as needed.
Expand Down
Loading
Loading