Skip to content

Commit aa8238f

Browse files
authored
Merge pull request #19608 from miancheng7/e2etestforissue19406
Add e2e test to reproduce issue #19406
2 parents f5dbde1 + 03839c9 commit aa8238f

File tree

4 files changed

+156
-23
lines changed

4 files changed

+156
-23
lines changed

server/storage/mvcc/kvstore_compaction.go

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
4949

5050
tx := s.b.BatchTx()
5151
tx.LockOutsideApply()
52+
// gofail: var compactAfterAcquiredBatchTxLock struct{}
5253
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
5354
for i := range keys {
5455
rev = BytesToRev(keys[i])

tests/e2e/metrics_test.go

+1-23
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,12 @@
1515
package e2e
1616

1717
import (
18-
"bytes"
1918
"context"
2019
"fmt"
21-
"io"
22-
"net/http"
2320
"net/url"
2421
"testing"
2522
"time"
2623

27-
dto "github.com/prometheus/client_model/go"
28-
"github.com/prometheus/common/expfmt"
2924
"github.com/stretchr/testify/require"
3025

3126
"go.etcd.io/etcd/api/v3/version"
@@ -324,7 +319,7 @@ func TestNoMetricsMissing(t *testing.T) {
324319
metricsURL, err := url.JoinPath(epc.Procs[0].Config().ClientURL, "metrics")
325320
require.NoError(t, err)
326321

327-
mfs, err := getMetrics(metricsURL)
322+
mfs, err := e2e.GetMetrics(metricsURL)
328323
require.NoError(t, err)
329324

330325
var missingMetrics []string
@@ -341,23 +336,6 @@ func TestNoMetricsMissing(t *testing.T) {
341336
}
342337
}
343338

344-
func getMetrics(metricsURL string) (map[string]*dto.MetricFamily, error) {
345-
httpClient := http.Client{Transport: &http.Transport{}}
346-
resp, err := httpClient.Get(metricsURL)
347-
if err != nil {
348-
return nil, err
349-
}
350-
defer resp.Body.Close()
351-
352-
data, err := io.ReadAll(resp.Body)
353-
if err != nil {
354-
return nil, err
355-
}
356-
357-
var parser expfmt.TextParser
358-
return parser.TextToMetricFamilies(bytes.NewReader(data))
359-
}
360-
361339
// formatMetrics is only for test purpose
362340
/*func formatMetrics(metrics []string) string {
363341
quoted := make([]string, len(metrics))

tests/e2e/reproduce_19406_test.go

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"net/url"
21+
"testing"
22+
"time"
23+
24+
clientv3 "go.etcd.io/etcd/client/v3"
25+
"go.etcd.io/etcd/pkg/v3/stringutil"
26+
"go.etcd.io/etcd/tests/v3/framework/e2e"
27+
28+
"github.com/stretchr/testify/require"
29+
)
30+
31+
// TestReproduce19406 reproduces the issue: https://github.com/etcd-io/etcd/issues/19406
32+
func TestReproduce19406(t *testing.T) {
33+
e2e.BeforeTest(t)
34+
35+
compactionSleepInterval := 100 * time.Millisecond
36+
ctx := context.TODO()
37+
38+
clus, cerr := e2e.NewEtcdProcessCluster(ctx, t,
39+
e2e.WithClusterSize(1),
40+
e2e.WithGoFailEnabled(true),
41+
e2e.WithCompactionBatchLimit(1),
42+
e2e.WithCompactionSleepInterval(compactionSleepInterval),
43+
)
44+
require.NoError(t, cerr)
45+
t.Cleanup(func() { require.NoError(t, clus.Stop()) })
46+
47+
// Produce some data
48+
cli := newClient(t, clus.EndpointsGRPC(), e2e.ClientConfig{})
49+
valueSize := 10
50+
var latestRevision int64
51+
52+
produceKeyNum := 20
53+
for i := 0; i <= produceKeyNum; i++ {
54+
resp, err := cli.Put(ctx, fmt.Sprintf("%d", i), stringutil.RandString(uint(valueSize)))
55+
require.NoError(t, err)
56+
latestRevision = resp.Header.Revision
57+
}
58+
59+
// Sleep for PerCompactionInterationInterval to simulate a single iteration of compaction lasting at least this duration.
60+
PerCompactionInterationInterval := compactionSleepInterval
61+
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactAfterAcquiredBatchTxLock",
62+
fmt.Sprintf(`sleep("%s")`, PerCompactionInterationInterval)))
63+
64+
// start compaction
65+
t.Log("start compaction...")
66+
_, err := cli.Compact(ctx, latestRevision, clientv3.WithCompactPhysical())
67+
require.NoError(t, err)
68+
t.Log("finished compaction...")
69+
70+
// Validate that total compaction sleep interval
71+
// Compaction runs in batches. During each batch, it acquires a lock, releases it at the end,
72+
// and then waits for a compactionSleepInterval before starting the next batch. This pause
73+
// allows PUT requests to be processed.
74+
// Therefore, the total compaction sleep interval larger or equal to
75+
// (compaction iteration number - 1) * compactionSleepInterval
76+
httpEndpoint := clus.EndpointsHTTP()[0]
77+
totalKeys := produceKeyNum + 1
78+
pauseDuration, totalDuration := getEtcdCompactionMetrics(t, httpEndpoint)
79+
require.NoError(t, err)
80+
actualSleepInterval := time.Duration(totalDuration-pauseDuration) * time.Millisecond
81+
expectSleepInterval := compactionSleepInterval * time.Duration(totalKeys)
82+
t.Logf("db_compaction_pause_duration: %.2f db_compaction_total_duration: %.2f, totalKeys: %d",
83+
pauseDuration, totalDuration, totalKeys)
84+
require.GreaterOrEqualf(t, actualSleepInterval, expectSleepInterval,
85+
"expect total compact sleep interval larger than (%v) but got (%v)",
86+
expectSleepInterval, actualSleepInterval)
87+
}
88+
89+
func getEtcdCompactionMetrics(t *testing.T, httpEndpoint string) (pauseDuration, totalDuration float64) {
90+
metricsURL, err := url.JoinPath(httpEndpoint, "metrics")
91+
require.NoError(t, err)
92+
93+
// Fetch metrics from the endpoint
94+
metricFamilies, err := e2e.GetMetrics(metricsURL)
95+
require.NoError(t, err)
96+
97+
// Extract sum from histogram metric
98+
getHistogramSum := func(name string) float64 {
99+
mf, ok := metricFamilies[name]
100+
require.Truef(t, ok, "metric %q not found", name)
101+
require.NotEmptyf(t, mf.Metric, "metric %q has no data", name)
102+
103+
hist := mf.Metric[0].GetHistogram()
104+
require.NotEmptyf(t, hist, "metric %q is not a histogram", name)
105+
106+
return hist.GetSampleSum()
107+
}
108+
109+
pauseDuration = getHistogramSum("etcd_debugging_mvcc_db_compaction_pause_duration_milliseconds")
110+
totalDuration = getHistogramSum("etcd_debugging_mvcc_db_compaction_total_duration_milliseconds")
111+
112+
return pauseDuration, totalDuration
113+
}

tests/framework/e2e/metrics.go

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import (
18+
"bytes"
19+
"io"
20+
"net/http"
21+
22+
dto "github.com/prometheus/client_model/go"
23+
"github.com/prometheus/common/expfmt"
24+
)
25+
26+
func GetMetrics(metricsURL string) (map[string]*dto.MetricFamily, error) {
27+
httpClient := http.Client{Transport: &http.Transport{}}
28+
resp, err := httpClient.Get(metricsURL)
29+
if err != nil {
30+
return nil, err
31+
}
32+
defer resp.Body.Close()
33+
34+
data, err := io.ReadAll(resp.Body)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
var parser expfmt.TextParser
40+
return parser.TextToMetricFamilies(bytes.NewReader(data))
41+
}

0 commit comments

Comments
 (0)