diff --git a/CHANGELOG.md b/CHANGELOG.md index 98f88976be4..adc5a372ba5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,8 @@ * [ENHANCEMENT] Querier: Add optional per-tenant max limits for label name and label value requests, `max_label_names_limit` and `max_label_values_limit`. #13654 * [ENHANCEMENT] Usage tracker: `loadSnapshot()` checks shard emptiness instead of using explicit `first` parameter. #13534 * [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525 +* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13603 +* [ENHANCEMENT] Query-scheduler: Gracefully handle shutdown by draining the queue before exiting. #12605 * [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525 * [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586 * [BUGFIX] Compactor: Fix potential concurrent map writes. #13053 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 02ef88fddf3..7f25efcf6ff 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -19175,6 +19175,16 @@ "fieldDefaultValue": 0, "fieldFlag": "query-scheduler.max-used-instances", "fieldType": "int" + }, + { + "kind": "field", + "name": "graceful_shutdown_timeout", + "required": false, + "desc": "Maximum time that the scheduler waits for the queue to drain on shutdown. (default 2m15s)", + "fieldValue": null, + "fieldDefaultValue": 135000000000, + "fieldFlag": "query-scheduler.graceful-shutdown-timeout", + "fieldType": "duration" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 61cea9eaca4..4897a7723ba 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2521,6 +2521,8 @@ Usage of ./cmd/mimir/mimir: [experimental] Set to true to use the zero-allocation response decoder for active series queries. -query-frontend.use-mimir-query-engine-for-sharding [experimental] Set to true to enable performing query sharding inside the Mimir query engine (MQE). This setting has no effect if sharding is disabled. Requires remote execution and MQE to be enabled. + -query-scheduler.graceful-shutdown-timeout duration + Maximum time that the scheduler waits for the queue to drain on shutdown. (default 2m15s) (default 2m15s) -query-scheduler.grpc-client-config.backoff-max-period duration Maximum delay when backing off. (default 10s) -query-scheduler.grpc-client-config.backoff-min-period duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index b2d31b63997..1628db495a4 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -645,6 +645,8 @@ Usage of ./cmd/mimir/mimir: The socket read/write timeout. (default 200ms) -query-frontend.scheduler-address string Address of the query-scheduler component, in host:port format. The host should resolve to all query-scheduler instances. This option should be set only when query-scheduler component is in use and -query-scheduler.service-discovery-mode is set to 'dns'. + -query-scheduler.graceful-shutdown-timeout duration + Maximum time that the scheduler waits for the queue to drain on shutdown. (default 2m15s) (default 2m15s) -query-scheduler.max-outstanding-requests-per-tenant int Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100) -query-scheduler.max-used-instances int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 8bcc2688e99..66868cc936b 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2139,6 +2139,11 @@ ring: # available query-scheduler instances. # CLI flag: -query-scheduler.max-used-instances [max_used_instances: | default = 0] + +# Maximum time that the scheduler waits for the queue to drain on shutdown. +# (default 2m15s) +# CLI flag: -query-scheduler.graceful-shutdown-timeout +[graceful_shutdown_timeout: | default = 2m15s] ``` ### ruler diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 3b020a6732a..5d634a0b152 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1358,6 +1358,7 @@ "query-scheduler.ring.instance-port": 0, "query-scheduler.ring.instance-addr": "", "query-scheduler.max-used-instances": 0, + "query-scheduler.graceful-shutdown-timeout": 135000000000, "usage-stats.enabled": true, "usage-stats.installation-mode": "custom", "overrides-exporter.ring.store": "memberlist", diff --git a/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go b/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go index 2025b344bee..b7bf9e829d6 100644 --- a/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go +++ b/pkg/scheduler/queue/multi_queuing_algorithm_tree_queue_benchmark_test.go @@ -13,10 +13,12 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/grafana/mimir/pkg/scheduler/queue/tree" ) @@ -423,7 +425,9 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -439,6 +443,13 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) { ctx := context.Background() require.NoError(t, queue.starting(ctx)) + t.Cleanup(func() { + // if the test has failed and the queue does not get cleared, + // we must send a shutdown signal for the remaining connected querier + // or else StopAndAwaitTerminated will never complete. + assert.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + // configure queue producers to enqueue requests with the query component // randomly assigned according to the distribution defined in the test case queueDimensionFunc := makeWeightedRandAdditionalQueueDimensionFunc( diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index c37df591ff1..c216ee015b1 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -9,6 +9,7 @@ import ( "container/list" "context" "fmt" + "sync" "time" "github.com/go-kit/log" @@ -117,8 +118,10 @@ type RequestQueue struct { discardedRequests *prometheus.CounterVec // per user enqueueDuration prometheus.Histogram - stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. - stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. + stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. + stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. + closeStopCompleted func() + stopTimeout time.Duration requestsToEnqueue chan requestToEnqueue requestsSent chan *SchedulerRequest @@ -127,6 +130,7 @@ type RequestQueue struct { waitingDequeueRequests chan *QuerierWorkerDequeueRequest waitingDequeueRequestsToDispatch *list.List waitingDequeueRequestsToDispatchCount *atomic.Int64 + schedulerInflightRequests *atomic.Int64 // QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier // to the time are completed by the querier or failed due to cancel, timeout, or disconnect. @@ -219,7 +223,9 @@ func NewRequestQueue( queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, enqueueDuration prometheus.Histogram, + schedulerInflightRequests *atomic.Int64, querierInflightRequestsMetric *prometheus.SummaryVec, + stopTimeout time.Duration, ) (*RequestQueue, error) { queryComponentCapacity, err := NewQueryComponentUtilization(querierInflightRequestsMetric) if err != nil { @@ -241,6 +247,7 @@ func NewRequestQueue( // channels must not be buffered so that we can detect when dispatcherLoop() has finished. stopRequested: make(chan struct{}), stopCompleted: make(chan struct{}), + stopTimeout: stopTimeout, requestsToEnqueue: make(chan requestToEnqueue), requestsSent: make(chan *SchedulerRequest), @@ -249,11 +256,16 @@ func NewRequestQueue( waitingDequeueRequests: make(chan *QuerierWorkerDequeueRequest), waitingDequeueRequestsToDispatch: list.New(), waitingDequeueRequestsToDispatchCount: atomic.NewInt64(0), + schedulerInflightRequests: schedulerInflightRequests, QueryComponentUtilization: queryComponentCapacity, queueBroker: newQueueBroker(maxOutstandingPerTenant, forgetDelay), } + q.closeStopCompleted = sync.OnceFunc(func() { + close(q.stopCompleted) + }) + q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue") return q, nil @@ -293,7 +305,10 @@ func (q *RequestQueue) running(ctx context.Context) error { } func (q *RequestQueue) dispatcherLoop() { - stopping := false + // The only way for this loop to exit is for a valid stop state to have been reached, or for it to have crashed. + defer q.closeStopCompleted() + + isStopping := false for { needToDispatchQueries := false @@ -301,7 +316,10 @@ func (q *RequestQueue) dispatcherLoop() { select { case <-q.stopRequested: // Nothing much to do here - fall through to the stop logic below to see if we can stop immediately. - stopping = true + isStopping = true + case <-q.stopCompleted: + // We still check if q.stopCompleted is closed or else we'd be leaving this goroutine running in a timeout exit condition + return case querierWorkerOp := <-q.querierWorkerOperations: // Need to attempt to dispatch queries only if querier-worker operation results in a resharding needToDispatchQueries = q.processQuerierWorkerOperation(querierWorkerOp) @@ -336,31 +354,43 @@ func (q *RequestQueue) dispatcherLoop() { } } - // if we have received a signal to stop, we continue to dispatch queries until - // the queue is empty or until we have no more connected querier workers. - if stopping && (q.queueBroker.isEmpty() || q.connectedQuerierWorkers.Load() == 0) { - // tell any waiting querier connections that nothing is coming - currentElement := q.waitingDequeueRequestsToDispatch.Front() + if isStopping { + if q.schedulerInflightRequests.Load() == 0 && q.queueBroker.itemCount() == 0 { + // If the queue is stopping and theres no connected query workers, + // we exit immediately because there is no way for (any) remaining queries to be processed + if q.connectedQuerierWorkers.Load() == 0 { + level.Info(q.log).Log("msg", "queue stop completed: query queue is empty and all workers have been disconnected") + return + } - for currentElement != nil { - waitingDequeueReq := currentElement.Value.(*QuerierWorkerDequeueRequest) - waitingDequeueReq.sendError(ErrStopped) - nextElement := currentElement.Next() - q.waitingDequeueRequestsToDispatchCount.Dec() - q.waitingDequeueRequestsToDispatch.Remove(currentElement) - currentElement = nextElement - } + // If the queue is stopping and theres no requests in the queue we cancel any remaining dequeue requests, + // which stops the query workers and exit the service + level.Info(q.log).Log("msg", "queue stop requested and all pending requests have been processed, disconnecting queriers") + + currentElement := q.waitingDequeueRequestsToDispatch.Front() + + for currentElement != nil { + waitingDequeueReq := currentElement.Value.(*QuerierWorkerDequeueRequest) + waitingDequeueReq.sendError(ErrStopped) + + level.Debug(q.log).Log("msg", "cancelled dequeue request", "querier_id", waitingDequeueReq.QuerierID, "worker_id", waitingDequeueReq.WorkerID) + + nextElement := currentElement.Next() + q.waitingDequeueRequestsToDispatchCount.Dec() + q.waitingDequeueRequestsToDispatch.Remove(currentElement) + currentElement = nextElement + } - if !q.queueBroker.isEmpty() { - // All queriers have disconnected, but we still have requests in the queue. - // Without any consumers we have nothing to do but stop the RequestQueue. - // This should never happen, but if this does happen, we want to know about it. - level.Warn(q.log).Log("msg", "shutting down dispatcher loop: have no connected querier workers, but request queue is not empty, so these requests will be abandoned") + // We are done. + level.Info(q.log).Log("msg", "queue stop completed: all query dequeue requests closed") + return } - // We are done. - close(q.stopCompleted) - return + level.Info(q.log).Log( + "msg", "queue stop is stopping but query queue is not empty, waiting for query workers to complete remaining requests", + "queued_requests", q.queueBroker.itemCount(), + "scheduler_inflight", q.schedulerInflightRequests.Load(), + ) } } } @@ -485,6 +515,8 @@ func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRe return reqForQuerier.queryRequest, reqForQuerier.lastTenantIndex, reqForQuerier.err case <-dequeueReq.ctx.Done(): return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err() + case <-q.stopCompleted: + return nil, dequeueReq.lastTenantIndex, ErrStopped } case <-dequeueReq.ctx.Done(): return nil, dequeueReq.lastTenantIndex, dequeueReq.ctx.Err() @@ -499,9 +531,26 @@ func (q *RequestQueue) stop(_ error) error { // Reads from stopRequested tell dispatcherLoop to enter a stopping state where it tries to clear the queue. // The loop needs to keep executing other select branches while stopping in order to clear the queue. q.stopRequested <- struct{}{} - <-q.stopCompleted - return nil + // We set a context with a timeout to ensure that in the event the scheduler is the last component + // running we do not hang forever waiting for the queue to be drained. + // This timeout is set using -query-scheduler.graceful-shutdown-timeout and defaults to 30 seconds + ctx, cancel := context.WithTimeout(context.Background(), q.stopTimeout) + + for { + select { + case <-q.stopCompleted: + cancel() + return nil + case <-ctx.Done(): + level.Warn(q.log).Log("msg", "queue stop timeout reached: query queue is not empty but queries have not been handled before the timeout", "stopTimeout", q.stopTimeout) + + cancel() + q.closeStopCompleted() + + return nil + } + } } func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { diff --git a/pkg/scheduler/queue/queue_broker.go b/pkg/scheduler/queue/queue_broker.go index 2fd75391d56..132ddbcaaad 100644 --- a/pkg/scheduler/queue/queue_broker.go +++ b/pkg/scheduler/queue/queue_broker.go @@ -68,6 +68,10 @@ func (qb *queueBroker) isEmpty() bool { return qb.tree.IsEmpty() } +func (qb *queueBroker) itemCount() int { + return qb.tree.ItemCount() +} + // enqueueRequestBack is the standard interface to enqueue requests for dispatch to queriers. // // Tenants and tenant-querier shuffle sharding relationships are managed internally as needed. diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 424de0b2228..daab6ad0486 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -11,6 +11,7 @@ import ( "strconv" "sync" "testing" + "testing/synctest" "time" "github.com/go-kit/log" @@ -21,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/scheduler/queue/tree" @@ -38,6 +40,8 @@ var secondQueueDimensionOptions = []string{ unknownQueueDimension, } +const queueStopTimeout = 5 * time.Second + // randAdditionalQueueDimension is the basic implementation of additionalQueueDimensionFunc, // used to assign the expected query component queue dimensions to SchedulerRequests // before they are enqueued by the queue producer groups utilized in benchmark tests. @@ -100,7 +104,9 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(b, err) @@ -347,7 +353,9 @@ func TestDispatchToWaitingDequeueRequestForUnregisteredQuerierWorker(t *testing. promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -461,7 +469,9 @@ func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -547,7 +557,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -620,7 +632,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -709,7 +723,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -765,7 +781,9 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -796,7 +814,9 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, ) require.NoError(t, err) @@ -845,3 +865,195 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend // assert request was re-enqueued for tenant after failed send require.False(t, qb.tree.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) } + +// This test ensure that the queue instance waits until the timeout to exit when there are queriers connected to +// dequeue the waiting requests +func TestRequestQueue_ShutdownWithPendingRequests_ShouldTimeout(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + tenantID := "testTenant" + + // We create a queue instance + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + 0, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, + ) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + // Push a request to it + req := makeSchedulerRequest(tenantID, []string{}) + require.NotNil(t, req) + err = queue.SubmitRequestToEnqueue(tenantID, req, 1, func() {}) + require.NoError(t, err) + + // Ensure that the request has been added to the queue + require.Equal(t, queue.queueBroker.tree.ItemCount(), 1) + + // Then stop the Queue service without dequeueing the request + now := time.Now() + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + synctest.Wait() + + // And ensure requests Queue stops after timeout + require.GreaterOrEqual(t, time.Since(now), queueStopTimeout) + require.GreaterOrEqual(t, queueStopTimeout+time.Second, time.Since(now)) + }) +} + +// This test ensures that the queue will wait for any pending tests to be dequeued and processed before exiting. +// This should be completed before the timeout. +func TestRequestQueue_ShutdownWithInflightRequests_ShouldDrainRequests(t *testing.T) { + ctx := context.Background() + tenantID := "testTenant" + querierID := "querier1" + + // Create a new request queue + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + 0, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + atomic.NewInt64(0), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, + ) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + // And register a worker for dequeueing + conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(conn)) + + // Push a request to the queue + req := makeSchedulerRequest(tenantID, []string{}) + require.NotNil(t, req) + err = queue.SubmitRequestToEnqueue(tenantID, req, 1, func() {}) + require.NoError(t, err) + + // And make sure it got queued + require.Equal(t, queue.queueBroker.tree.ItemCount(), 1) + + // Stop the Queue + start := time.Now() + queue.StopAsync() + + // Consume existing requests + dequeueReq := NewQuerierWorkerDequeueRequest(conn, FirstTenant()) + r, _, err := queue.AwaitRequestForQuerier(dequeueReq) + require.NoError(t, err) + require.NotNil(t, r) + + // Ensure requests Queue is empty and stops + require.Equal(t, queue.queueBroker.tree.ItemCount(), 0) + require.NoError(t, queue.AwaitTerminated(ctx)) + + // And it was within the timeout + require.WithinDuration(t, time.Now(), start, queue.stopTimeout-1*time.Second) +} + +// This test ensures that even if the queue has no pending requests but existing requests are still being processed, +// it will reach the timeout and exit if they don't return in time +func TestRequestQueue_ShutdownWithInflightSchedulerRequests_ShouldTimeout(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := context.Background() + + // For this test we're intentionally testing the scheduler inflight requests rather than the internal queue length + inflight := atomic.NewInt64(0) + + // So we create a queue using our local inflight request tracker + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + 0, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + inflight, + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, + ) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + // And then record a request as inflight in the scheduler, even though we're not putting anything in the queue + inflight.Add(1) + + // Then stop the Queue + start := time.Now() + assert.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + + // And ensure that it waits until the timeout to exit (using synctest.Wait so we're not actually waiting) + synctest.Wait() + require.LessOrEqual(t, queueStopTimeout, time.Since(start)) + require.LessOrEqual(t, time.Since(start), queueStopTimeout+time.Second) + }) +} + +// This test ensures that even if the queue has no pending requests, we still wait until any inflight requests +// have been returned before existing +func TestRequestQueue_ShutdownWithInflightSchedulerRequests_ShouldDrainRequests(t *testing.T) { + ctx := context.Background() + tenantID := "testTenant" + querierID := "querier1" + + // We care about the inflight requests + inflight := atomic.NewInt64(0) + + // So we create a queue + queue, err := NewRequestQueue( + log.NewNopLogger(), + 1, + 0, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), + inflight, + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), + queueStopTimeout, + ) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + + // With a worker + conn := NewUnregisteredQuerierWorkerConn(context.Background(), querierID) + require.NoError(t, queue.AwaitRegisterQuerierWorkerConn(conn)) + + // And push a request to the queue + req := makeSchedulerRequest(tenantID, []string{}) + require.NotNil(t, req) + err = queue.SubmitRequestToEnqueue(tenantID, req, 1, func() {}) + require.NoError(t, err) + + // And make sure it got to the queue + require.Equal(t, queue.queueBroker.tree.ItemCount(), 1) + // Then we record it as inflight + require.Equal(t, inflight.Add(1), int64(1)) + + // Stop the Queue + start := time.Now() + queue.StopAsync() + + // Consume the existing request from the queue + dequeueReq := NewQuerierWorkerDequeueRequest(conn, FirstTenant()) + r, _, err := queue.AwaitRequestForQuerier(dequeueReq) + require.NoError(t, err) + require.NotNil(t, r) + + // Ensure the request has been removed from the queue and remove it from inflight tracking + require.Equal(t, queue.queueBroker.tree.ItemCount(), 0) + require.Equal(t, inflight.Sub(1), int64(0)) + + // And finally make sure it stops within the timeout + require.NoError(t, queue.AwaitTerminated(ctx)) + require.WithinDuration(t, time.Now(), start, queue.stopTimeout-1*time.Second) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 71ee6e98ccd..7b9a7544bd7 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -33,6 +33,7 @@ import ( "go.opentelemetry.io/otel/attribute" otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -70,7 +71,8 @@ type Scheduler struct { inflightRequestsMu sync.Mutex // schedulerInflightRequests tracks requests from the time they are received to be enqueued by the scheduler // to the time they are completed by the querier or failed due to cancel, timeout, or disconnect. - schedulerInflightRequests map[queue.RequestKey]*queue.SchedulerRequest + schedulerInflightRequests map[queue.RequestKey]*queue.SchedulerRequest + schedulerInflightRequestCount *atomic.Int64 // The ring is used to let other components discover query-scheduler replicas. // The ring is optional. @@ -106,11 +108,14 @@ type Config struct { GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` ServiceDiscovery schedulerdiscovery.Config `yaml:",inline"` + + SchedulerGracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") + f.DurationVar(&cfg.SchedulerGracefulShutdownTimeout, "query-scheduler.graceful-shutdown-timeout", 135*time.Second, "Maximum time that the scheduler waits for the queue to drain on shutdown. (default 2m15s)") cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) @@ -130,9 +135,10 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe log: log, limits: limits, - schedulerInflightRequests: map[queue.RequestKey]*queue.SchedulerRequest{}, - connectedFrontends: map[string]*connectedFrontend{}, - subservicesWatcher: services.NewFailureWatcher(), + schedulerInflightRequests: map[queue.RequestKey]*queue.SchedulerRequest{}, + schedulerInflightRequestCount: atomic.NewInt64(0), + connectedFrontends: map[string]*connectedFrontend{}, + subservicesWatcher: services.NewFailureWatcher(), } s.queueLength = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ @@ -171,7 +177,9 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.queueLength, s.discardedRequests, enqueueDuration, + s.schedulerInflightRequestCount, querierInflightRequestsMetric, + cfg.SchedulerGracefulShutdownTimeout, ) if err != nil { return nil, err @@ -247,7 +255,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front // We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns // cancels all their queries. - for s.State() == services.Running { + for s.isRunning() { msg, err := frontend.Recv() if err != nil { // No need to report this as error, it is expected when query-frontend performs SendClose() (as frontendSchedulerWorker does). @@ -288,6 +296,9 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front case errors.Is(err, queue.ErrTooManyRequests): enqueueSpan.RecordError(err) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT} + case errors.Is(err, queue.ErrStopped): + enqueueSpan.RecordError(err) + resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} default: enqueueSpan.RecordError(err) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR, Error: err.Error()} @@ -410,6 +421,7 @@ func (s *Scheduler) addRequestToPending(req *queue.SchedulerRequest) { defer s.inflightRequestsMu.Unlock() s.schedulerInflightRequests[req.Key()] = req + s.schedulerInflightRequestCount.Store(int64(len(s.schedulerInflightRequests))) } // This method doesn't do removal from the queue. @@ -423,6 +435,7 @@ func (s *Scheduler) cancelRequestAndRemoveFromPending(key queue.RequestKey, reas } delete(s.schedulerInflightRequests, key) + s.schedulerInflightRequestCount.Store(int64(len(s.schedulerInflightRequests))) return req } @@ -466,6 +479,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL // (see note in transformRequestQueueError). // ErrQuerierWorkerDisconnected is caused by connection/goroutine crash; // we expect this loop is no longer alive to receive the error anyway. + level.Info(s.log).Log("msg", "AwaitRequestForQuerier returned an error", "err", err) return s.transformRequestQueueError(err) } @@ -503,6 +517,8 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL } } + level.Info(s.log).Log("msg", "query loop closed because scheduler is not running", "querier_id", querierID) + return schedulerpb.ErrSchedulerIsNotRunning } @@ -671,11 +687,7 @@ func (s *Scheduler) running(ctx context.Context) error { for { select { case <-inflightRequestsTicker.C: - s.inflightRequestsMu.Lock() - inflight := len(s.schedulerInflightRequests) - s.inflightRequestsMu.Unlock() - - s.inflightRequests.Observe(float64(inflight)) + s.inflightRequests.Observe(float64(s.schedulerInflightRequestCount.Load())) case <-ctx.Done(): return nil case err := <-s.subservicesWatcher.Chan(): diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index ec1036fdc0f..0a0ff23aa2a 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -75,6 +75,7 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedu cfg := Config{} flagext.DefaultValues(&cfg) cfg.MaxOutstandingPerTenant = testMaxOutstandingPerTenant + cfg.SchedulerGracefulShutdownTimeout = 5 * time.Second s, err := NewScheduler(cfg, &limits{queriers: 2}, log.NewNopLogger(), reg) require.NoError(t, err) @@ -367,6 +368,8 @@ func TestTracingContext(t *testing.T) { require.True(t, r.ParentSpanContext.IsValid()) require.Equal(t, sp.SpanContext().TraceID().String(), r.ParentSpanContext.TraceID().String()) } + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), scheduler)) } func TestSchedulerShutdown_FrontendLoop(t *testing.T) { @@ -388,6 +391,7 @@ func TestSchedulerShutdown_FrontendLoop(t *testing.T) { msg, err := frontendLoop.Recv() require.NoError(t, err) require.Equal(t, schedulerpb.SHUTTING_DOWN, msg.Status) + // require.Equal(t, queue.ErrStopped.Error(), msg.Error) verifyQueryComponentUtilizationLeft(t, scheduler) }