Skip to content

Commit 8cf80dd

Browse files
committed
create batchable StoreSeries server
1 parent aaf202a commit 8cf80dd

File tree

19 files changed

+612
-37
lines changed

19 files changed

+612
-37
lines changed

cmd/thanos/query.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ func registerQuery(app *extkingpin.App) {
212212
lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
213213
Default("20").Hidden().Int()
214214

215+
seriesResponseBatchSize := cmd.Flag("query.series-response-batch-size", "How many Series can be batched in one gRPC message.").
216+
Default("1").Hidden().Int()
217+
215218
var storeRateLimits store.SeriesSelectLimits
216219
storeRateLimits.RegisterFlags(cmd)
217220

@@ -359,6 +362,7 @@ func registerQuery(app *extkingpin.App) {
359362
*tenantLabel,
360363
*queryDistributedWithOverlappingInterval,
361364
*lazyRetrievalMaxBufferedResponses,
365+
*seriesResponseBatchSize,
362366
)
363367
})
364368
}
@@ -424,6 +428,7 @@ func runQuery(
424428
tenantLabel string,
425429
queryDistributedWithOverlappingInterval bool,
426430
lazyRetrievalMaxBufferedResponses int,
431+
seriesResponseBatchSize int,
427432
) error {
428433
comp := component.Query
429434
if alertQueryURL == "" {
@@ -458,6 +463,7 @@ func runQuery(
458463
maxConcurrentSelects,
459464
queryTimeout,
460465
deduplicationFunc,
466+
seriesResponseBatchSize,
461467
)
462468
remoteEndpointsCreator = query.NewRemoteEndpointsCreator(
463469
logger,

pkg/api/query/grpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
3333
logger := log.NewNopLogger()
3434
reg := prometheus.NewRegistry()
3535
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, labels.EmptyLabels(), 1*time.Minute, store.LazyRetrieval)
36-
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty)
36+
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty, 1)
3737
remoteEndpointsCreator := query.NewRemoteEndpointsCreator(logger, func() []query.Client { return nil }, nil, 1*time.Minute, true, true)
3838
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
3939
api := NewGRPCAPI(time.Now, nil, queryableCreator, remoteEndpointsCreator, queryFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)
@@ -77,7 +77,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) {
7777
logger := log.NewNopLogger()
7878
reg := prometheus.NewRegistry()
7979
proxy := store.NewProxyStore(logger, reg, func() []store.Client { return nil }, component.Store, labels.EmptyLabels(), 1*time.Minute, store.LazyRetrieval)
80-
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty)
80+
queryableCreator := query.NewQueryableCreator(logger, reg, proxy, 1, 1*time.Minute, dedup.AlgorithmPenalty, 1)
8181
remoteEndpointsCreator := query.NewRemoteEndpointsCreator(logger, func() []query.Client { return nil }, nil, 1*time.Minute, true, true)
8282
lookbackDeltaFunc := func(i int64) time.Duration { return 5 * time.Minute }
8383
tests := []struct {

pkg/api/query/v1_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func TestQueryEndpoints(t *testing.T) {
194194
baseAPI: &baseAPI.BaseAPI{
195195
Now: func() time.Time { return now },
196196
},
197-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
197+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
198198
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
199199
queryCreate: queryFactory,
200200
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
@@ -539,7 +539,7 @@ func TestQueryExplainEndpoints(t *testing.T) {
539539
baseAPI: &baseAPI.BaseAPI{
540540
Now: func() time.Time { return now },
541541
},
542-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
542+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
543543
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
544544
queryCreate: queryFactory,
545545
defaultEngine: PromqlEnginePrometheus,
@@ -603,7 +603,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
603603
baseAPI: &baseAPI.BaseAPI{
604604
Now: func() time.Time { return now },
605605
},
606-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
606+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
607607
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
608608
queryCreate: queryFactory,
609609
defaultEngine: PromqlEnginePrometheus,
@@ -746,7 +746,7 @@ func TestMetadataEndpoints(t *testing.T) {
746746
baseAPI: &baseAPI.BaseAPI{
747747
Now: func() time.Time { return now },
748748
},
749-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
749+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
750750
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
751751
queryCreate: queryFactory,
752752
defaultEngine: PromqlEnginePrometheus,
@@ -763,7 +763,7 @@ func TestMetadataEndpoints(t *testing.T) {
763763
baseAPI: &baseAPI.BaseAPI{
764764
Now: func() time.Time { return now },
765765
},
766-
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty),
766+
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout, dedup.AlgorithmPenalty, 1),
767767
remoteEndpointsCreate: emptyRemoteEndpointsCreate,
768768
queryCreate: queryFactory,
769769
defaultEngine: PromqlEnginePrometheus,

pkg/query/querier.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package query
55

66
import (
77
"context"
8+
"slices"
89
"strings"
910
"sync"
1011
"time"
@@ -78,6 +79,7 @@ func NewQueryableCreator(
7879
maxConcurrentSelects int,
7980
selectTimeout time.Duration,
8081
deduplicationFunc string,
82+
seriesResponseBatchSize int,
8183
) QueryableCreator {
8284
gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects)
8385

@@ -104,34 +106,36 @@ func NewQueryableCreator(
104106
gateProviderFn: func() gate.Gate {
105107
return gf.New()
106108
},
107-
maxConcurrentSelects: maxConcurrentSelects,
108-
selectTimeout: selectTimeout,
109-
shardInfo: shardInfo,
110-
seriesStatsReporter: seriesStatsReporter,
109+
maxConcurrentSelects: maxConcurrentSelects,
110+
selectTimeout: selectTimeout,
111+
shardInfo: shardInfo,
112+
seriesStatsReporter: seriesStatsReporter,
113+
seriesResponseBatchSize: seriesResponseBatchSize,
111114
}
112115
}
113116
}
114117

115118
type queryable struct {
116-
logger log.Logger
117-
deduplicationFunc string
118-
replicaLabels []string
119-
storeDebugMatchers [][]*labels.Matcher
120-
proxy storepb.StoreServer
121-
deduplicate bool
122-
maxResolutionMillis int64
123-
partialResponse bool
124-
skipChunks bool
125-
gateProviderFn func() gate.Gate
126-
maxConcurrentSelects int
127-
selectTimeout time.Duration
128-
shardInfo *storepb.ShardInfo
129-
seriesStatsReporter seriesStatsReporter
119+
logger log.Logger
120+
deduplicationFunc string
121+
replicaLabels []string
122+
storeDebugMatchers [][]*labels.Matcher
123+
proxy storepb.StoreServer
124+
deduplicate bool
125+
maxResolutionMillis int64
126+
partialResponse bool
127+
skipChunks bool
128+
gateProviderFn func() gate.Gate
129+
maxConcurrentSelects int
130+
selectTimeout time.Duration
131+
shardInfo *storepb.ShardInfo
132+
seriesStatsReporter seriesStatsReporter
133+
seriesResponseBatchSize int
130134
}
131135

132136
// Querier returns a new storage querier against the underlying proxy store API.
133137
func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) {
134-
return newQuerier(q.logger, mint, maxt, q.deduplicationFunc, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil
138+
return newQuerier(q.logger, mint, maxt, q.deduplicationFunc, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter, q.seriesResponseBatchSize), nil
135139
}
136140

137141
type querier struct {
@@ -149,6 +153,7 @@ type querier struct {
149153
selectTimeout time.Duration
150154
shardInfo *storepb.ShardInfo
151155
seriesStatsReporter seriesStatsReporter
156+
seriesResponseBatchSize int
152157
}
153158

154159
// newQuerier creates implementation of storage.Querier that fetches data from the proxy
@@ -169,6 +174,7 @@ func newQuerier(
169174
selectTimeout time.Duration,
170175
shardInfo *storepb.ShardInfo,
171176
seriesStatsReporter seriesStatsReporter,
177+
seriesResponseBatchSize int,
172178
) *querier {
173179
if logger == nil {
174180
logger = log.NewNopLogger()
@@ -195,6 +201,7 @@ func newQuerier(
195201
skipChunks: skipChunks,
196202
shardInfo: shardInfo,
197203
seriesStatsReporter: seriesStatsReporter,
204+
seriesResponseBatchSize: seriesResponseBatchSize,
198205
}
199206
}
200207

@@ -224,6 +231,16 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
224231
return nil
225232
}
226233

234+
if r.GetBatch() != nil {
235+
batch := *r.GetBatch()
236+
s.seriesSet = slices.Grow(s.seriesSet, len(batch.Series))
237+
for _, series := range batch.Series {
238+
s.seriesSet = append(s.seriesSet, *series)
239+
s.seriesSetStats.Count(series)
240+
}
241+
return nil
242+
}
243+
227244
// Unsupported field, skip.
228245
return nil
229246
}
@@ -353,6 +370,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
353370
ShardInfo: q.shardInfo,
354371
PartialResponseStrategy: q.partialResponseStrategy,
355372
SkipChunks: q.skipChunks,
373+
ResponseBatchSize: int64(q.seriesResponseBatchSize),
356374
}
357375
if q.isDedupEnabled() {
358376
// Soft ask to sort without replica labels and push them at the end of labelset.

pkg/query/querier_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
5151
t.Parallel()
5252

5353
testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}}
54-
queryableCreator := NewQueryableCreator(nil, nil, newProxyStore(testProxy), 2, 5*time.Second, dedup.AlgorithmPenalty)
54+
queryableCreator := NewQueryableCreator(nil, nil, newProxyStore(testProxy), 2, 5*time.Second, dedup.AlgorithmPenalty, 1)
5555

5656
oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
5757
queryable := queryableCreator(
@@ -99,6 +99,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
9999
2,
100100
timeout,
101101
dedup.AlgorithmPenalty,
102+
1,
102103
)(false,
103104
nil,
104105
nil,
@@ -404,7 +405,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {
404405
g := gate.New(2)
405406
mq := &mockedQueryable{
406407
Creator: func(mint, maxt int64) storage.Querier {
407-
return newQuerier(nil, mint, maxt, dedup.AlgorithmPenalty, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter)
408+
return newQuerier(nil, mint, maxt, dedup.AlgorithmPenalty, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter, 1)
408409
},
409410
}
410411
t.Cleanup(func() {
@@ -800,6 +801,7 @@ func TestQuerier_Select(t *testing.T) {
800801
timeout,
801802
nil,
802803
NoopSeriesStatsReporter,
804+
1,
803805
)
804806
t.Cleanup(func() { testutil.Ok(t, q.Close()) })
805807

@@ -1079,7 +1081,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
10791081

10801082
timeout := 100 * time.Second
10811083
g := gate.New(2)
1082-
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter)
1084+
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter, 1)
10831085
t.Cleanup(func() {
10841086
testutil.Ok(t, q.Close())
10851087
})
@@ -1149,7 +1151,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
11491151

11501152
timeout := 5 * time.Second
11511153
g := gate.New(2)
1152-
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter)
1154+
q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, dedup.AlgorithmPenalty, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter, 1)
11531155
t.Cleanup(func() {
11541156
testutil.Ok(t, q.Close())
11551157
})

0 commit comments

Comments
 (0)