Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#](https://github.com/thanos-io/thanos/pull/8623): Query: support sending a batch of Series per SeriesResponse with `--query.series-response-batch-size` flag.
- [#](https://github.com/thanos-io/thanos/pull/8582): Sidecar: support --storage.tsdb.delay-compact-file.path Prometheus flag.

### Changed
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func registerQuery(app *extkingpin.App) {
lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").Hidden().Int()

seriesResponseBatchSize := cmd.Flag("query.series-response-batch-size", "How many Series can be batched in one gRPC message.").
Default("1").Hidden().Int()

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

Expand Down Expand Up @@ -359,6 +362,7 @@ func registerQuery(app *extkingpin.App) {
*tenantLabel,
*queryDistributedWithOverlappingInterval,
*lazyRetrievalMaxBufferedResponses,
*seriesResponseBatchSize,
)
})
}
Expand Down Expand Up @@ -424,6 +428,7 @@ func runQuery(
tenantLabel string,
queryDistributedWithOverlappingInterval bool,
lazyRetrievalMaxBufferedResponses int,
seriesResponseBatchSize int,
) error {
comp := component.Query
if alertQueryURL == "" {
Expand Down Expand Up @@ -458,6 +463,7 @@ func runQuery(
maxConcurrentSelects,
queryTimeout,
deduplicationFunc,
seriesResponseBatchSize,
)
remoteEndpointsCreator = query.NewRemoteEndpointsCreator(
logger,
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, labels.EmptyLabels(), 1*time.Minute, store.LazyRetrieval)
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty)
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty, 1)
remoteEndpointsCreator := query.NewRemoteEndpointsCreator(logger, func() []query.Client { return nil }, nil, 1*time.Minute, true, true)
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
api := NewGRPCAPI(time.Now, nil, queryableCreator, remoteEndpointsCreator, queryFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, labels.EmptyLabels(), 1*time.Minute, store.LazyRetrieval)
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty)
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty, 1)
remoteEndpointsCreator := query.NewRemoteEndpointsCreator(logger, func() []query.Client { return nil }, nil, 1*time.Minute, true, true)
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
tests := []struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
queryCreate: queryFactory,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
Expand Down Expand Up @@ -539,7 +539,7 @@ func TestQueryExplainEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
queryCreate: queryFactory,
defaultEngine: PromqlEnginePrometheus,
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
queryCreate: queryFactory,
defaultEngine: PromqlEnginePrometheus,
Expand Down Expand Up @@ -746,7 +746,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
queryCreate: queryFactory,
defaultEngine: PromqlEnginePrometheus,
Expand All @@ -763,7 +763,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
queryCreate: queryFactory,
defaultEngine: PromqlEnginePrometheus,
Expand Down
56 changes: 37 additions & 19 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package query

import (
"context"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewQueryableCreator(
maxConcurrentSelects int,
selectTimeout time.Duration,
deduplicationFunc string,
seriesResponseBatchSize int,
) QueryableCreator {
gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects)

Expand All @@ -104,34 +106,36 @@ func NewQueryableCreator(
gateProviderFn: func() gate.Gate {
return gf.New()
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
shardInfo: shardInfo,
seriesStatsReporter: seriesStatsReporter,
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
shardInfo: shardInfo,
seriesStatsReporter: seriesStatsReporter,
seriesResponseBatchSize: seriesResponseBatchSize,
}
}
}

type queryable struct {
logger log.Logger
deduplicationFunc string
replicaLabels []string
storeDebugMatchers [][]*labels.Matcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
gateProviderFn func() gate.Gate
maxConcurrentSelects int
selectTimeout time.Duration
shardInfo *storepb.ShardInfo
seriesStatsReporter seriesStatsReporter
logger log.Logger
deduplicationFunc string
replicaLabels []string
storeDebugMatchers [][]*labels.Matcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
gateProviderFn func() gate.Gate
maxConcurrentSelects int
selectTimeout time.Duration
shardInfo *storepb.ShardInfo
seriesStatsReporter seriesStatsReporter
seriesResponseBatchSize int
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) {
return newQuerier(q.logger, mint, maxt, q.deduplicationFunc, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil
return newQuerier(q.logger, mint, maxt, q.deduplicationFunc, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter, q.seriesResponseBatchSize), nil
}

type querier struct {
Expand All @@ -149,6 +153,7 @@ type querier struct {
selectTimeout time.Duration
shardInfo *storepb.ShardInfo
seriesStatsReporter seriesStatsReporter
seriesResponseBatchSize int
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -169,6 +174,7 @@ func newQuerier(
selectTimeout time.Duration,
shardInfo *storepb.ShardInfo,
seriesStatsReporter seriesStatsReporter,
seriesResponseBatchSize int,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -195,6 +201,7 @@ func newQuerier(
skipChunks: skipChunks,
shardInfo: shardInfo,
seriesStatsReporter: seriesStatsReporter,
seriesResponseBatchSize: seriesResponseBatchSize,
}
}

Expand Down Expand Up @@ -224,6 +231,16 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
return nil
}

if r.GetBatch() != nil {
batch := *r.GetBatch()
s.seriesSet = slices.Grow(s.seriesSet, len(batch.Series))
for _, series := range batch.Series {
s.seriesSet = append(s.seriesSet, *series)
s.seriesSetStats.Count(series)
}
return nil
}

// Unsupported field, skip.
return nil
}
Expand Down Expand Up @@ -353,6 +370,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
ShardInfo: q.shardInfo,
PartialResponseStrategy: q.partialResponseStrategy,
SkipChunks: q.skipChunks,
ResponseBatchSize: int64(q.seriesResponseBatchSize),
}
if q.isDedupEnabled() {
// Soft ask to sort without replica labels and push them at the end of labelset.
Expand Down
10 changes: 6 additions & 4 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
t.Parallel()

testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}}
queryableCreator := NewQueryableCreator(nil, nil, newProxyStore(testProxy), 2, 5*time.Second, dedup.AlgorithmPenalty)
queryableCreator := NewQueryableCreator(nil, nil, newProxyStore(testProxy), 2, 5*time.Second, dedup.AlgorithmPenalty, 1)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(
Expand Down Expand Up @@ -99,6 +99,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
2,
timeout,
dedup.AlgorithmPenalty,
1,
)(false,
nil,
nil,
Expand Down Expand Up @@ -404,7 +405,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {
g := gate.New(2)
mq := &mockedQueryable{
Creator: func(mint, maxt int64) storage.Querier {
return newQuerier(nil, mint, maxt, dedup.AlgorithmPenalty, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter)
return newQuerier(nil, mint, maxt, dedup.AlgorithmPenalty, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter, 1)
},
}
t.Cleanup(func() {
Expand Down Expand Up @@ -800,6 +801,7 @@ func TestQuerier_Select(t *testing.T) {
timeout,
nil,
NoopSeriesStatsReporter,
1,
)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

Expand Down Expand Up @@ -1079,7 +1081,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter)
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter, 1)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -1149,7 +1151,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter)
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter, 1)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
Loading
Loading