Skip to content

Commit 146b64a

Browse files
committed
add new quorum response strategy
1 parent 6075d41 commit 146b64a

File tree

19 files changed

+841
-161
lines changed

19 files changed

+841
-161
lines changed

cmd/thanos/query.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ func registerQuery(app *extkingpin.App) {
200200
enableGroupReplicaPartialStrategy := cmd.Flag("query.group-replica-strategy", "Enable group-replica partial response strategy.").
201201
Default("false").Bool()
202202

203+
enableQuorumPartialStrategy := cmd.Flag("query.quorum-strategy", "Enable quorum partial response strategy based on InfoAPI store replica_group/quorum hints. Cannot be combined with query.group-replica-strategy.").
204+
Default("false").Bool()
205+
203206
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
204207
Hidden().Default("true").Bool()
205208

@@ -320,6 +323,10 @@ func registerQuery(app *extkingpin.App) {
320323
return err
321324
}
322325

326+
if *enableGroupReplicaPartialStrategy && *enableQuorumPartialStrategy {
327+
return errors.New("only one of --query.group-replica-strategy and --query.quorum-strategy can be enabled")
328+
}
329+
323330
// Parse blocked metric patterns
324331
var blockedMetricPatterns []string
325332
if *blockQueryMetricsWithoutFilter != "" {
@@ -408,6 +415,7 @@ func registerQuery(app *extkingpin.App) {
408415
*enforceTenancy,
409416
*tenantLabel,
410417
*enableGroupReplicaPartialStrategy,
418+
*enableQuorumPartialStrategy,
411419
*rewriteAggregationLabelStrategy,
412420
*rewriteAggregationLabelTo,
413421
*lazyRetrievalMaxBufferedResponses,
@@ -499,6 +507,7 @@ func runQuery(
499507
enforceTenancy bool,
500508
tenantLabel string,
501509
groupReplicaPartialResponseStrategy bool,
510+
quorumPartialResponseStrategy bool,
502511
rewriteAggregationLabelStrategy string,
503512
rewriteAggregationLabelTo string,
504513
lazyRetrievalMaxBufferedResponses int,
@@ -638,7 +647,7 @@ func runQuery(
638647
unhealthyStoreTimeout,
639648
endpointInfoTimeout,
640649
// ignoreErrors when group_replica partial response strategy is enabled.
641-
groupReplicaPartialResponseStrategy,
650+
groupReplicaPartialResponseStrategy || quorumPartialResponseStrategy,
642651
queryConnMetricLabels...,
643652
)
644653

@@ -652,6 +661,7 @@ func runQuery(
652661
)
653662
opts := query.Options{
654663
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
664+
QuorumPartialResponseStrategy: quorumPartialResponseStrategy,
655665
DeduplicationFunc: queryDeduplicationFunc,
656666
RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy,
657667
RewriteAggregationLabelTo: rewriteAggregationLabelTo,

cmd/thanos/receive.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func registerReceive(app *extkingpin.App) {
6868
conf.registerFlag(cmd)
6969

7070
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
71+
if err := conf.validate(); err != nil {
72+
return err
73+
}
7174
lset, err := parseFlagLabels(conf.labelStrs)
7275
if err != nil {
7376
return errors.Wrap(err, "parse labels")
@@ -249,6 +252,9 @@ func runReceive(
249252
}
250253
multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache))
251254
}
255+
if conf.replicaGroup != "" {
256+
multiTSDBOptions = append(multiTSDBOptions, receive.WithReplicaGroup(conf.replicaGroup, conf.quorum))
257+
}
252258

253259
dbs := receive.NewMultiTSDB(
254260
conf.dataDir,
@@ -449,6 +455,8 @@ func runReceive(
449455
SupportsSharding: true,
450456
SupportsWithoutReplicaLabels: true,
451457
TsdbInfos: proxy.TSDBInfos(),
458+
ReplicaGroup: conf.replicaGroup,
459+
Quorum: int32(conf.quorum),
452460
}, nil
453461
}
454462
return nil, errors.New("Not ready")
@@ -931,8 +939,10 @@ type receiveConfig struct {
931939
rwClientSkipVerify bool
932940
rwServerTlsMinVersion string
933941

934-
dataDir string
935-
labelStrs []string
942+
dataDir string
943+
labelStrs []string
944+
replicaGroup string
945+
quorum int
936946

937947
objStoreConfig *extflag.PathOrContent
938948
retention *model.Duration
@@ -1002,6 +1012,19 @@ type receiveConfig struct {
10021012
noUploadTenants *[]string
10031013
}
10041014

1015+
func (rc *receiveConfig) validate() error {
1016+
if rc.replicaGroup == "" {
1017+
if rc.quorum != 0 {
1018+
return errors.New("invalid receive config: --receive.quorum requires --receive.replica-group")
1019+
}
1020+
return nil
1021+
}
1022+
if rc.quorum <= 0 {
1023+
return errors.New("invalid receive config: --receive.quorum must be > 0 when --receive.replica-group is set")
1024+
}
1025+
return nil
1026+
}
1027+
10051028
func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10061029
rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd)
10071030
rc.grpcConfig.registerFlag(cmd)
@@ -1035,6 +1058,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10351058

10361059
cmd.Flag("label", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").StringsVar(&rc.labelStrs)
10371060

1061+
cmd.Flag("receive.replica-group", "Replica group identifier for GROUP_REPLICA partial response strategy. Stores with the same replica_group value hold replicated data. When set, the query layer can tolerate failures as long as enough replicas in the group are healthy.").Default("").StringVar(&rc.replicaGroup)
1062+
1063+
cmd.Flag("receive.quorum", "Minimum number of healthy stores required per replica group for GROUP_REPLICA partial response strategy. Must be > 0 when receive.replica-group is set.").Default("0").IntVar(&rc.quorum)
1064+
10381065
rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
10391066

10401067
rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables the retention policy (i.e. infinite retention). For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d"))

cmd/thanos/receive_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
6+
"github.com/efficientgo/core/testutil"
7+
)
8+
9+
func TestReceiveConfigValidateReplicaGroupQuorum(t *testing.T) {
10+
t.Parallel()
11+
12+
for _, tc := range []struct {
13+
name string
14+
replicaGroup string
15+
quorum int
16+
expectErr bool
17+
}{
18+
{
19+
name: "both unset",
20+
replicaGroup: "",
21+
quorum: 0,
22+
expectErr: false,
23+
},
24+
{
25+
name: "both set",
26+
replicaGroup: "rg",
27+
quorum: 1,
28+
expectErr: false,
29+
},
30+
{
31+
name: "only quorum set",
32+
replicaGroup: "",
33+
quorum: 1,
34+
expectErr: true,
35+
},
36+
{
37+
name: "only replica-group set (quorum default)",
38+
replicaGroup: "rg",
39+
quorum: 0,
40+
expectErr: true,
41+
},
42+
{
43+
name: "replica-group set with negative quorum",
44+
replicaGroup: "rg",
45+
quorum: -1,
46+
expectErr: true,
47+
},
48+
{
49+
name: "negative quorum without replica-group",
50+
replicaGroup: "",
51+
quorum: -1,
52+
expectErr: true,
53+
},
54+
} {
55+
t.Run(tc.name, func(t *testing.T) {
56+
conf := &receiveConfig{
57+
replicaGroup: tc.replicaGroup,
58+
quorum: tc.quorum,
59+
}
60+
err := conf.validate()
61+
if tc.expectErr {
62+
testutil.NotOk(t, err)
63+
return
64+
}
65+
testutil.Ok(t, err)
66+
})
67+
}
68+
}

cmd/thanos/rule.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ func queryFuncCreator(
916916
var spanID string
917917

918918
switch partialResponseStrategy {
919-
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA:
919+
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA, storepb.PartialResponseStrategy_QUORUM:
920920
spanID = "/rule_instant_query HTTP[client]"
921921
case storepb.PartialResponseStrategy_ABORT:
922922
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"

pkg/info/infopb/rpc.pb.go

Lines changed: 120 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)