Skip to content

Commit 7c6a812

Browse files
julietteOvaxvms
andcommitted
Rebase and complete per-sample ha deduplication
Signed-off-by: juliette.orain <[email protected]> Co-authored-by: Nicolas DUPEUX <[email protected]>
1 parent af8a113 commit 7c6a812

File tree

3 files changed

+103
-105
lines changed

3 files changed

+103
-105
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* [ENHANCEMENT] Usage tracker: `loadSnapshot()` checks shard emptiness instead of using explicit `first` parameter. #13534
6868
* [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525
6969
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525
70+
* [ENHANCEMENT] HA: Deduplication per sample instead of per batch. #13665
7071
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
7172
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
7273
* [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084

pkg/distributor/distributor.go

Lines changed: 84 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,24 +1151,35 @@ type haReplica struct {
11511151
cluster, replica string
11521152
}
11531153

1154-
// TODO document what this does
1155-
func (d *Distributor) replicaObserved(ctx context.Context, userID string, replica haReplica) (replicaState, error) {
1154+
type replicaInfo struct {
1155+
state replicaState
1156+
sampleCount int
1157+
}
1158+
1159+
// replicaObserved checks if a sample from a given replica should be accepted for ingestion based on HA deduplication rules.
1160+
//
1161+
// Returns a replicaState indicating the acceptance status and classification of the replica:
1162+
// - replicaIsPrimary: sample is from the elected primary replica and should be accepted
1163+
// - replicaNotHA: sample doesn't have both HA labels and should be accepted
1164+
// - replicaDeduped: sample is from a non-primary replica and should be deduplicated
1165+
// - replicaRejectedTooManyClusters: sample is rejected because the tenant has too many HA clusters
1166+
// - replicaRejectedUnknown: sample is rejected due to an unknown error
1167+
func (d *Distributor) replicaObserved(ctx context.Context, userID string, replica haReplica, ts int64) (replicaState, error) {
11561168
if span := opentracing.SpanFromContext(ctx); span != nil {
1157-
// TODO move this if in a separate function
11581169
// Make a copy of these, since they may be retained as tags
11591170
span.LogFields(
1160-
opentracing_log.String("cluster", copyString(replica.cluster)),
1161-
opentracing_log.String("replica", copyString(replica.replica)),
1171+
opentracing_log.String("cluster", strings.Clone(replica.cluster)),
1172+
opentracing_log.String("replica", strings.Clone(replica.replica)),
11621173
)
11631174
}
11641175

1165-
isAccepted, err := d.checkSample(ctx, userID, replica.cluster, replica.replica)
1176+
isAccepted, err := d.checkSample(ctx, userID, replica.cluster, replica.replica, ts)
11661177
if err != nil {
11671178
switch {
1168-
case errors.Is(err, replicasNotMatchError{}):
1179+
case errors.As(err, &replicasDidNotMatchError{}):
11691180
// These samples have been deduped.
11701181
return replicaDeduped, err
1171-
case errors.Is(err, tooManyClustersError{}):
1182+
case errors.As(err, &tooManyClustersError{}):
11721183
return replicaRejectedTooManyClusters, err
11731184
default:
11741185
return replicaRejectedUnknown, err
@@ -1182,7 +1193,6 @@ func (d *Distributor) replicaObserved(ctx context.Context, userID string, replic
11821193
return replicaNotHA, nil
11831194
}
11841195

1185-
11861196
func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
11871197
return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error {
11881198
req, err := pushReq.WriteRequest()
@@ -1200,7 +1210,8 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
12001210
}
12011211

12021212
haReplicaLabel := d.limits.HAReplicaLabel(userID)
1203-
cluster, replica := findHALabels(haReplicaLabel, d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
1213+
haClusterLabel := d.limits.HAClusterLabel(userID)
1214+
cluster, replica := findHALabels(haReplicaLabel, haClusterLabel, req.Timeseries[0].Labels)
12041215
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
12051216
cluster, replica = strings.Clone(cluster), strings.Clone(replica)
12061217

@@ -1213,12 +1224,11 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
12131224
numSamples := 0
12141225
now := time.Now()
12151226

1216-
var getReplicaForSample func(int) haReplica
1217-
{
1218-
var replicaKey haReplica // TODO dimitarvdimitrov - do we need this closure or a regular func also works?
1219-
getReplicaForSample = func(i int) haReplica {
1220-
replicaKey.cluster, replicaKey.replica = findHALabels(haReplicaLabel, haClusterLabel, req.Timeseries[i].Labels)
1221-
return replicaKey
1227+
getReplicaForSample := func(i int) haReplica {
1228+
cluster, replica := findHALabels(haReplicaLabel, haClusterLabel, req.Timeseries[i].Labels)
1229+
return haReplica{
1230+
cluster: cluster,
1231+
replica: replica,
12221232
}
12231233
}
12241234

@@ -1247,61 +1257,57 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
12471257
}
12481258

12491259
var errs multierror.MultiError
1250-
// TODO dimitarvdimitrov maybe unite the two maps that have the haReplicas as the key
1251-
samplesPerReplica := make(map[haReplica]int)
1252-
replicaStates := make(map[haReplica]replicaState, len(samplesPerReplica))
1260+
replicaInfos := make(map[haReplica]*replicaInfo)
12531261
samplesPerState := make(map[replicaState]int)
1262+
// Check if all timeseries belong to the same replica
1263+
firstReplica := getReplicaForSample(0)
12541264
isOneReplica := true
1255-
{
1256-
replica := getReplicaForSample(0)
1257-
1258-
for i := range req.Timeseries {
1259-
if getReplicaForSample(i) != replica {
1260-
isOneReplica = false
1261-
break
1262-
}
1263-
}
1264-
}
1265-
{
1266-
if isOneReplica {
1267-
samplesPerReplica[getReplicaForSample(0)] = numSamples
1268-
} else {
1269-
for i := range req.Timeseries {
1270-
samplesPerReplica[getReplicaForSample(i)]++
1271-
}
1265+
for i := 1; i < len(req.Timeseries); i++ {
1266+
if getReplicaForSample(i) != firstReplica {
1267+
isOneReplica = false
1268+
break
12721269
}
12731270
}
1271+
1272+
// Count samples per replica
12741273
if isOneReplica {
1275-
var r haReplica
1276-
for k := range samplesPerReplica {
1277-
r = k
1278-
}
1274+
replicaInfos[firstReplica] = &replicaInfo{sampleCount: numSamples}
1275+
// Optimize getReplicaForSample to return the single replica directly
12791276
getReplicaForSample = func(i int) haReplica {
1280-
return r
1277+
return firstReplica
1278+
}
1279+
} else {
1280+
for i := range req.Timeseries {
1281+
r := getReplicaForSample(i)
1282+
info := replicaInfos[r]
1283+
if info == nil {
1284+
info = &replicaInfo{}
1285+
replicaInfos[r] = info
1286+
}
1287+
info.sampleCount++
12811288
}
12821289
}
1283-
for replicaKey, numSamples := range samplesPerReplica {
1284-
state, ok := replicaStates[replicaKey]
1285-
if !ok {
1286-
state, err = d.replicaObserved(ctx, userID, replicaKey)
1287-
replicaStates[replicaKey] = state
1290+
for replicaKey, info := range replicaInfos {
1291+
if info.state == replicaRejectedUnknown {
1292+
state, err := d.replicaObserved(ctx, userID, replicaKey, sampleTimestamp)
1293+
info.state = state
12881294
errs.Add(err)
12891295
}
1290-
samplesPerState[state] += numSamples
1296+
samplesPerState[info.state] += info.sampleCount
12911297
}
1292-
lastAccepted := sortByAccepted(req, replicaStates, getReplicaForSample)
1298+
lastAccepted := sortByAccepted(req, replicaInfos, getReplicaForSample)
12931299
var getReplicaState func(haReplica) replicaState
12941300
if isOneReplica {
12951301
var s replicaState
1296-
for _, rs := range replicaStates {
1297-
s = rs
1302+
for _, info := range replicaInfos {
1303+
s = info.state
12981304
}
12991305
getReplicaState = func(replica haReplica) replicaState {
13001306
return s
13011307
}
13021308
} else {
13031309
getReplicaState = func(replica haReplica) replicaState {
1304-
return replicaStates[replica]
1310+
return replicaInfos[replica].state
13051311
}
13061312
}
13071313
for i := 0; i <= lastAccepted; i++ {
@@ -1317,35 +1323,15 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
13171323
}
13181324
// We don't want to send samples beyond the last accepted sample - that was deduplicated
13191325
pushReq.AddCleanup(sliceUnacceptedRequests(req, lastAccepted))
1320-
{
1321-
// TODO move into a function
1322-
for replica, state := range replicaStates {
1323-
if state&replicaDeduped != 0 && samplesPerReplica[replica] > 0 {
1324-
cluster := copyString(replica.cluster) // Make a copy of this, since it may be retained as labels on our metrics
1325-
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(samplesPerReplica[replica]))
1326-
}
1327-
}
1328-
if samplesPerState[replicaNotHA] > 0 {
1329-
d.nonHASamples.WithLabelValues(userID).Add(float64(samplesPerState[replicaNotHA]))
1330-
}
1331-
if samplesPerState[replicaRejectedTooManyClusters] > 0 {
1332-
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(samplesPerState[replicaRejectedTooManyClusters]))
1333-
}
1334-
}
1335-
var resp *mimirpb.WriteResponse
1326+
d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState)
1327+
13361328
if len(req.Timeseries) > 0 {
1337-
cleanupInDefer = false
1338-
resp, err = next(ctx, pushReq)
1329+
err = next(ctx, pushReq)
13391330
}
13401331
errs.Add(err)
13411332

1342-
if samplesPerState[replicaDeduped] > 0 {
1343-
return nil, httpgrpc.Errorf(http.StatusAccepted, errs.Err().Error())
1344-
} else if len(errs) > 0 {
1345-
return nil, httpgrpc.Errorf(http.StatusBadRequest, errs.Err().Error()) // ermmm?
1346-
}
1347-
return resp, nil
1348-
}
1333+
return errs.Err()
1334+
})
13491335
}
13501336

13511337
func sliceUnacceptedRequests(req *mimirpb.WriteRequest, lastAccepted int) func() {
@@ -1358,20 +1344,36 @@ func sliceUnacceptedRequests(req *mimirpb.WriteRequest, lastAccepted int) func()
13581344

13591345
}
13601346

1347+
// updateHADedupeMetrics updates metrics related to HA deduplication.
1348+
func (d *Distributor) updateHADedupeMetrics(userID, group string, replicaInfos map[haReplica]*replicaInfo, samplesPerState map[replicaState]int) {
1349+
for replica, info := range replicaInfos {
1350+
if info.state&replicaDeduped != 0 && info.sampleCount > 0 {
1351+
cluster := strings.Clone(replica.cluster) // Make a copy of this, since it may be retained as labels on our metrics
1352+
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(info.sampleCount))
1353+
}
1354+
}
1355+
if samplesPerState[replicaNotHA] > 0 {
1356+
d.nonHASamples.WithLabelValues(userID).Add(float64(samplesPerState[replicaNotHA]))
1357+
}
1358+
if samplesPerState[replicaRejectedTooManyClusters] > 0 {
1359+
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(samplesPerState[replicaRejectedTooManyClusters]))
1360+
}
1361+
}
1362+
13611363
// sortByAccepted returns the index of the last acepted timeseries in the write request based on the ha dedup sattes of the eplicas
1362-
func sortByAccepted(req *mimirpb.WriteRequest, replicaStates map[haReplica]replicaState, getReplicaForSample func(int) haReplica) int {
1364+
func sortByAccepted(req *mimirpb.WriteRequest, replicaInfos map[haReplica]*replicaInfo, getReplicaForSample func(int) haReplica) int {
13631365
numAcceptedReplicas := 0
1364-
for _, state := range replicaStates {
1365-
if state&replicaAccepted != 0 {
1366+
for _, info := range replicaInfos {
1367+
if info.state&replicaAccepted != 0 {
13661368
numAcceptedReplicas++
13671369
}
13681370
}
1369-
if numAcceptedReplicas == len(replicaStates) {
1371+
if numAcceptedReplicas == len(replicaInfos) {
13701372
return len(req.Timeseries) - 1
13711373
}
13721374
findPreviousAccepted := func(i int) int {
13731375
for i > 0 {
1374-
state := replicaStates[getReplicaForSample(i)]
1376+
state := replicaInfos[getReplicaForSample(i)].state
13751377
if state&replicaAccepted != 0 {
13761378
break
13771379
}
@@ -1385,7 +1387,7 @@ func sortByAccepted(req *mimirpb.WriteRequest, replicaStates map[haReplica]repli
13851387
if i > lastAccepted {
13861388
break
13871389
}
1388-
state := replicaStates[getReplicaForSample(i)]
1390+
state := replicaInfos[getReplicaForSample(i)].state
13891391
if state&replicaAccepted == 0 {
13901392
req.Timeseries[i], req.Timeseries[lastAccepted] = req.Timeseries[lastAccepted], req.Timeseries[i]
13911393
lastAccepted--

pkg/distributor/distributor_test.go

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -959,15 +959,15 @@ func TestDistributor_PushHAInstances(t *testing.T) {
959959
samples: 5,
960960
expectedResponse: emptyResponse,
961961
},
962-
// Using very long replica label value results in not pushing the sample, hence a success.
962+
// Using very long replica label value results in validation error.
963963
{
964-
enableTracker: true,
965-
acceptedReplica: "instance0",
966-
testReplica: "instance1234567890123456789012345678901234567890",
967-
cluster: "cluster0",
968-
samples: 5,
969-
expectedResponse: emptyResponse,
970-
expectedCode: 0,
964+
enableTracker: true,
965+
acceptedReplica: "instance0",
966+
testReplica: "instance1234567890123456789012345678901234567890",
967+
cluster: "cluster0",
968+
samples: 5,
969+
expectedError: status.New(codes.InvalidArgument, fmt.Sprintf(labelValueTooLongMsgFormat, 48, 15, "__replica__", "instance1234567890123456789012345678901234567890", mimirpb.FromLabelAdaptersToString(labelSetGenWithReplicaAndCluster("instance1234567890123456789012345678901234567890", "cluster0")(0)))),
970+
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.ERROR_CAUSE_BAD_DATA},
971971
},
972972
} {
973973
t.Run(strconv.Itoa(i), func(t *testing.T) {
@@ -2379,7 +2379,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
23792379
samples := make([]mimirpb.Sample, numSeriesPerRequest)
23802380

23812381
for i := 0; i < numSeriesPerRequest; i++ {
2382-
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
2382+
lbls := labels.NewBuilder(labels.FromStrings(model.MetricNameLabel, "foo"))
23832383
for i := 0; i < 10; i++ {
23842384
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
23852385
}
@@ -2407,7 +2407,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
24072407
samples := make([]mimirpb.Sample, numSeriesPerRequest)
24082408

24092409
for i := 0; i < numSeriesPerRequest; i++ {
2410-
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
2410+
lbls := labels.NewBuilder(labels.FromStrings(model.MetricNameLabel, "foo"))
24112411
for i := 0; i < 10; i++ {
24122412
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
24132413
}
@@ -5192,8 +5192,8 @@ func TestHaDedupeMiddleware(t *testing.T) {
51925192
},
51935193
expectedReqs: []*mimirpb.WriteRequest{makeWriteRequestForGenerators(5, labelSetGenWithCluster(cluster1), nil, nil)},
51945194
expectedNextCalls: 1,
5195-
expectErrs: []*status.Status{nil, status.New(codes.AlreadyExists, newReplicasDidNotMatchError(replica2, replica1).Error())},
5196-
expectDetails: []*mimirpb.ErrorDetails{nil, replicasDidNotMatchDetails},
5195+
expectErrs: []*status.Status{nil, status.New(codes.AlreadyExists, newReplicasDidNotMatchError(replica2, replica1).Error())},
5196+
expectDetails: []*mimirpb.ErrorDetails{nil, replicasDidNotMatchDetails},
51975197
}, {
51985198
name: "exceed max ha clusters limit",
51995199
ctx: ctxWithUser,
@@ -5236,7 +5236,8 @@ func TestHaDedupeMiddleware(t *testing.T) {
52365236
return []*mimirpb.WriteRequest{c1, c2}
52375237
}(),
52385238
expectedNextCalls: 2,
5239-
expectErrs: []int{0, 202},
5239+
expectErrs: []*status.Status{nil, status.New(codes.AlreadyExists, newReplicasDidNotMatchError(replica2, replica1).Error())},
5240+
expectDetails: []*mimirpb.ErrorDetails{nil, replicasDidNotMatchDetails},
52405241
},
52415242
}
52425243

@@ -5296,22 +5297,16 @@ func TestHaDedupeMiddleware(t *testing.T) {
52965297
for i := range gotReqs {
52975298
assert.ElementsMatch(t, tc.expectedReqs[i].Timeseries, gotReqs[i].Timeseries)
52985299
assert.ElementsMatch(t, tc.expectedReqs[i].Metadata, gotReqs[i].Metadata)
5299-
assert.Equal(t, tc.expectedReqs[i].SkipLabelNameValidation, gotReqs[i].SkipLabelNameValidation)
53005300
assert.Equal(t, tc.expectedReqs[i].Source, gotReqs[i].Source)
53015301
}
53025302
assert.Len(t, gotErrs, len(tc.expectErrs))
53035303
for errIdx, expectErr := range tc.expectErrs {
5304-
if expectErr > 0 {
5305-
// Expect an httpgrpc error with specific status code.
5306-
resp, ok := httpgrpc.HTTPResponseFromError(gotErrs[errIdx])
5307-
if assert.True(t, ok) {
5308-
assert.Equal(t, expectErr, int(resp.Code))
5309-
}
5310-
} else if expectErr == 0 {
5304+
if expectErr != nil {
5305+
// Expect a gRPC error.
5306+
checkGRPCError(t, expectErr, tc.expectDetails[errIdx], gotErrs[errIdx])
5307+
} else {
53115308
// Expect no error.
53125309
assert.Nil(t, gotErrs[errIdx])
5313-
} else {
5314-
checkGRPCError(t, expectErr, tc.expectDetails[errIdx], gotErrs[errIdx])
53155310
}
53165311
}
53175312

0 commit comments

Comments
 (0)