-
Notifications
You must be signed in to change notification settings - Fork 684
Query Scheduler: Graceful Shutdown with Inflight and Pending requests #13603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 21 commits
baa2692
7c619dc
ad08a63
b173d86
cb3a483
d137aba
2278e36
689f2f2
892c700
adcaba8
3edeb5e
82d8bad
1940f88
3afbdba
443cd0b
316b9e5
9a306b3
d0a77a9
25008a3
f3dca4f
978a257
76d5e0e
d759d69
0e41e78
20291d7
bc801ab
02617b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ import ( | |
| "container/list" | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/go-kit/log" | ||
|
|
@@ -117,8 +118,11 @@ type RequestQueue struct { | |
| discardedRequests *prometheus.CounterVec // per user | ||
| enqueueDuration prometheus.Histogram | ||
|
|
||
| stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. | ||
| stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. | ||
| stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. | ||
| stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. | ||
| closeStopCompleted func() | ||
| isStopping *atomic.Bool | ||
| stopTimeout time.Duration | ||
|
|
||
| requestsToEnqueue chan requestToEnqueue | ||
| requestsSent chan *SchedulerRequest | ||
|
|
@@ -127,6 +131,7 @@ type RequestQueue struct { | |
| waitingDequeueRequests chan *QuerierWorkerDequeueRequest | ||
| waitingDequeueRequestsToDispatch *list.List | ||
| waitingDequeueRequestsToDispatchCount *atomic.Int64 | ||
| schedulerInflightRequests *atomic.Int64 | ||
|
|
||
| // QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier | ||
| // to the time are completed by the querier or failed due to cancel, timeout, or disconnect. | ||
|
|
@@ -219,7 +224,9 @@ func NewRequestQueue( | |
| queueLength *prometheus.GaugeVec, | ||
| discardedRequests *prometheus.CounterVec, | ||
| enqueueDuration prometheus.Histogram, | ||
| schedulerInflightRequests *atomic.Int64, | ||
| querierInflightRequestsMetric *prometheus.SummaryVec, | ||
| stopTimeout time.Duration, | ||
| ) (*RequestQueue, error) { | ||
| queryComponentCapacity, err := NewQueryComponentUtilization(querierInflightRequestsMetric) | ||
| if err != nil { | ||
|
|
@@ -241,6 +248,8 @@ func NewRequestQueue( | |
| // channels must not be buffered so that we can detect when dispatcherLoop() has finished. | ||
| stopRequested: make(chan struct{}), | ||
| stopCompleted: make(chan struct{}), | ||
| isStopping: atomic.NewBool(false), | ||
| stopTimeout: stopTimeout, | ||
|
|
||
| requestsToEnqueue: make(chan requestToEnqueue), | ||
| requestsSent: make(chan *SchedulerRequest), | ||
|
|
@@ -249,11 +258,16 @@ func NewRequestQueue( | |
| waitingDequeueRequests: make(chan *QuerierWorkerDequeueRequest), | ||
| waitingDequeueRequestsToDispatch: list.New(), | ||
| waitingDequeueRequestsToDispatchCount: atomic.NewInt64(0), | ||
| schedulerInflightRequests: schedulerInflightRequests, | ||
|
|
||
| QueryComponentUtilization: queryComponentCapacity, | ||
| queueBroker: newQueueBroker(maxOutstandingPerTenant, forgetDelay), | ||
| } | ||
|
|
||
| q.closeStopCompleted = sync.OnceFunc(func() { | ||
| close(q.stopCompleted) | ||
| }) | ||
|
|
||
| q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue") | ||
|
|
||
| return q, nil | ||
|
|
@@ -293,15 +307,16 @@ func (q *RequestQueue) running(ctx context.Context) error { | |
| } | ||
|
|
||
| func (q *RequestQueue) dispatcherLoop() { | ||
| stopping := false | ||
|
|
||
| for { | ||
| needToDispatchQueries := false | ||
|
|
||
| select { | ||
| case <-q.stopRequested: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm wondering if we shouldn't check for stopRequested and stopCompleted in an independent select? otherwise there's no guarantee that our code will enter those branches as long as there are some something like select {
case <-stopRequested:
case <-stopCompleted:
default:
select {
case querierWorkerOp := <-q.querierWorkerOperations:
// ...
}
} |
||
| // Nothing much to do here - fall through to the stop logic below to see if we can stop immediately. | ||
| stopping = true | ||
| q.isStopping.Store(true) | ||
monoxane marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| case <-q.stopCompleted: | ||
| // We still check if q.stopCompleted is closed or else we'd be leaving this goroutine running in a timeout exit condition | ||
| return | ||
| case querierWorkerOp := <-q.querierWorkerOperations: | ||
| // Need to attempt to dispatch queries only if querier-worker operation results in a resharding | ||
| needToDispatchQueries = q.processQuerierWorkerOperation(querierWorkerOp) | ||
|
|
@@ -336,30 +351,46 @@ func (q *RequestQueue) dispatcherLoop() { | |
| } | ||
| } | ||
|
|
||
| // if we have received a signal to stop, we continue to dispatch queries until | ||
| // the queue is empty or until we have no more connected querier workers. | ||
| if stopping && (q.queueBroker.isEmpty() || q.connectedQuerierWorkers.Load() == 0) { | ||
| // tell any waiting querier connections that nothing is coming | ||
| // If the queue is stopping and theres no connected query workers, | ||
| // we exit immediately because there is no way for (any) remaining queries to be processed | ||
| if q.isStopping.Load() && q.connectedQuerierWorkers.Load() == 0 { | ||
| if q.queueBroker.itemCount() == 0 && q.schedulerInflightRequests.Load() == 0 { | ||
| // We are done. | ||
| level.Info(q.log).Log("msg", "queue stop completed: query queue is empty and all workers have been disconnected") | ||
|
|
||
| q.closeStopCompleted() | ||
monoxane marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return | ||
| } | ||
|
|
||
| level.Warn(q.log).Log( | ||
monoxane marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "msg", "queue stop requested but query queue is not empty, waiting for query workers to complete remaining requests", | ||
| "queueBroker_count", q.queueBroker.itemCount(), | ||
|
||
| "scheduler_inflight", q.schedulerInflightRequests.Load(), | ||
| ) | ||
| } | ||
|
|
||
| // If the queue is stopping and theres no requests in the queue we cancel any remaining dequeue requests, | ||
| // which stops the query workers and exit the service | ||
| if q.isStopping.Load() && q.schedulerInflightRequests.Load() == 0 && q.queueBroker.itemCount() == 0 { | ||
monoxane marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| level.Info(q.log).Log("msg", "queue stop requested and all pending requests have been processed, disconnecting queriers") | ||
|
|
||
| currentElement := q.waitingDequeueRequestsToDispatch.Front() | ||
|
|
||
| for currentElement != nil { | ||
| waitingDequeueReq := currentElement.Value.(*QuerierWorkerDequeueRequest) | ||
| waitingDequeueReq.sendError(ErrStopped) | ||
|
|
||
| level.Debug(q.log).Log("msg", "cancelled dequeue request", "querier_id", waitingDequeueReq.QuerierID, "worker_id", waitingDequeueReq.WorkerID) | ||
|
|
||
| nextElement := currentElement.Next() | ||
| q.waitingDequeueRequestsToDispatchCount.Dec() | ||
| q.waitingDequeueRequestsToDispatch.Remove(currentElement) | ||
| currentElement = nextElement | ||
| } | ||
|
|
||
| if !q.queueBroker.isEmpty() { | ||
| // All queriers have disconnected, but we still have requests in the queue. | ||
| // Without any consumers we have nothing to do but stop the RequestQueue. | ||
| // This should never happen, but if this does happen, we want to know about it. | ||
| level.Warn(q.log).Log("msg", "shutting down dispatcher loop: have no connected querier workers, but request queue is not empty, so these requests will be abandoned") | ||
| } | ||
|
|
||
| // We are done. | ||
| close(q.stopCompleted) | ||
| level.Info(q.log).Log("msg", "queue stop completed: all query dequeue requests closed") | ||
| q.closeStopCompleted() | ||
| return | ||
| } | ||
| } | ||
|
|
@@ -485,6 +516,8 @@ func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRe | |
| return reqForQuerier.queryRequest, reqForQuerier.lastTenantIndex, reqForQuerier.err | ||
| case <-dequeueReq.ctx.Done(): | ||
| return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err() | ||
| case <-q.stopCompleted: | ||
| return nil, dequeueReq.lastTenantIndex, ErrStopped | ||
| } | ||
| case <-dequeueReq.ctx.Done(): | ||
| return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err() | ||
|
|
@@ -499,9 +532,26 @@ func (q *RequestQueue) stop(_ error) error { | |
| // Reads from stopRequested tell dispatcherLoop to enter a stopping state where it tries to clear the queue. | ||
| // The loop needs to keep executing other select branches while stopping in order to clear the queue. | ||
| q.stopRequested <- struct{}{} | ||
| <-q.stopCompleted | ||
|
|
||
| return nil | ||
| // We set a context with a timeout to ensure that in the event the scheduler is the last component | ||
| // running we do not hang forever waiting for the queue to be drained. | ||
| // This timeout is set using -query-scheduler.graceful-shutdown-timeout and defaults to 30 seconds | ||
| ctx, cancel := context.WithTimeout(context.Background(), q.stopTimeout) | ||
|
|
||
| for { | ||
| select { | ||
| case <-q.stopCompleted: | ||
| cancel() | ||
| return nil | ||
| case <-ctx.Done(): | ||
| level.Warn(q.log).Log("msg", "queue stop timeout reached: query queue is not empty but queries have not been handled before the timeout") | ||
monoxane marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| cancel() | ||
| q.closeStopCompleted() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we close here and return can't you incorporate the timeout inside the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explicit reason for closing the channel is to signal dispatcherLoop to exit, I've tested it with goleak and it seems to work as expected leaving nothing behind. |
||
|
|
||
| return nil | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks unrelated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's from when I merged main into the branch.