Skip to content

Commit b73a516

Browse files
committed
Implement Compaction support in robustness test
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 2f216dc commit b73a516

File tree

14 files changed

+179
-59
lines changed

14 files changed

+179
-59
lines changed

tests/robustness/client/client.go

+4
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
193193
func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
194194
c.kvMux.Lock()
195195
defer c.kvMux.Unlock()
196+
callTime := time.Since(c.baseTime)
196197
resp, err := c.client.Compact(ctx, rev)
198+
returnTime := time.Since(c.baseTime)
199+
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
197200
return resp, err
198201
}
202+
199203
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
200204
c.kvMux.Lock()
201205
defer c.kvMux.Unlock()

tests/robustness/failpoint/failpoint.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,24 @@ var (
3939
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
4040
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
4141
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
42-
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
43-
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
44-
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
42+
BackendAfterWritebackBufPanic,
43+
RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
4544
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
4645
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
4746
BeforeApplyOneConfChangeSleep,
48-
MemberReplace,
4947
DropPeerNetwork,
5048
RaftBeforeSaveSleep,
5149
RaftAfterSaveSleep,
5250
ApplyBeforeOpenSnapshot,
51+
MemberReplace,
52+
53+
// Problem with compact
54+
//CompactBeforeCommitScheduledCompactPanic,
55+
//CompactBeforeCommitBatchPanic,
56+
//CompactAfterCommitBatchPanic,
57+
//CompactAfterCommitScheduledCompactPanic,
58+
//CompactBeforeSetFinishedCompactPanic,
59+
//CompactAfterSetFinishedCompactPanic,
5360
}
5461
)
5562

tests/robustness/failpoint/trigger.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
8383
return []report.ClientReport{cc.Report()}, nil
8484
}
8585

86-
func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
86+
func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
87+
// For multiBatchCompaction we need to guarantee that there are enough revisions between two compaction requests.
88+
// With addition of compaction requests to traffic this might be hard if experimental-compaction-batch-limit is too high.
89+
if t.multiBatchCompaction {
90+
return config.ServerConfig.ExperimentalCompactionBatchLimit <= 10
91+
}
8792
return true
8893
}

tests/robustness/model/describe.go

+7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
2525
if response.Error != "" {
2626
return fmt.Sprintf("err: %q", response.Error)
2727
}
28+
if response.ClientError != "" {
29+
return fmt.Sprintf("err: %q", response.ClientError)
30+
}
2831
if response.PartialResponse {
2932
return fmt.Sprintf("unknown, rev: %d", response.Revision)
3033
}
@@ -38,6 +41,8 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
3841
return "ok"
3942
}
4043
return fmt.Sprintf("ok, rev: %d", response.Revision)
44+
case Compact:
45+
return "ok"
4146
default:
4247
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
4348
}
@@ -67,6 +72,8 @@ func describeEtcdRequest(request EtcdRequest) string {
6772
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
6873
case Defragment:
6974
return fmt.Sprintf("defragment()")
75+
case Compact:
76+
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
7077
default:
7178
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
7279
}

tests/robustness/model/deterministic.go

+35-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"sort"
2424

2525
"github.com/anishathalye/porcupine"
26+
27+
"go.etcd.io/etcd/server/v3/storage/mvcc"
2628
)
2729

2830
// DeterministicModel assumes a deterministic execution of etcd requests. All
@@ -64,10 +66,11 @@ var DeterministicModel = porcupine.Model{
6466
}
6567

6668
type EtcdState struct {
67-
Revision int64
68-
KeyValues map[string]ValueRevision
69-
KeyLeases map[string]int64
70-
Leases map[int64]EtcdLease
69+
Revision int64
70+
CompactRevision int64
71+
KeyValues map[string]ValueRevision
72+
KeyLeases map[string]int64
73+
Leases map[int64]EtcdLease
7174
}
7275

7376
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
@@ -77,10 +80,12 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd
7780

7881
func freshEtcdState() EtcdState {
7982
return EtcdState{
80-
Revision: 1,
81-
KeyValues: map[string]ValueRevision{},
82-
KeyLeases: map[string]int64{},
83-
Leases: map[int64]EtcdLease{},
83+
Revision: 1,
84+
// Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason.
85+
CompactRevision: -1,
86+
KeyValues: map[string]ValueRevision{},
87+
KeyLeases: map[string]int64{},
88+
Leases: map[int64]EtcdLease{},
8489
}
8590
}
8691

@@ -100,6 +105,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
100105
if request.Range.Revision > s.Revision {
101106
return s, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
102107
}
108+
if request.Range.Revision < s.CompactRevision {
109+
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
110+
}
103111
return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}}
104112
case Txn:
105113
failure := false
@@ -178,6 +186,14 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
178186
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
179187
case Defragment:
180188
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
189+
case Compact:
190+
if request.Compact.Revision <= s.CompactRevision {
191+
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
192+
}
193+
s.CompactRevision = request.Compact.Revision
194+
// Set fake revision as compaction returns non-linearizable revision.
195+
// TODO: Model non-linearizable response revision in model.
196+
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: -1}}
181197
default:
182198
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
183199
}
@@ -237,6 +253,7 @@ const (
237253
LeaseGrant RequestType = "leaseGrant"
238254
LeaseRevoke RequestType = "leaseRevoke"
239255
Defragment RequestType = "defragment"
256+
Compact RequestType = "compact"
240257
)
241258

242259
type EtcdRequest struct {
@@ -246,6 +263,7 @@ type EtcdRequest struct {
246263
Range *RangeRequest
247264
Txn *TxnRequest
248265
Defragment *DefragmentRequest
266+
Compact *CompactRequest
249267
}
250268

251269
func (r *EtcdRequest) IsRead() bool {
@@ -337,6 +355,8 @@ type EtcdResponse struct {
337355
LeaseGrant *LeaseGrantReponse
338356
LeaseRevoke *LeaseRevokeResponse
339357
Defragment *DefragmentResponse
358+
Compact *CompactResponse
359+
ClientError string
340360
Revision int64
341361
}
342362

@@ -398,3 +418,10 @@ func ToValueOrHash(value string) ValueOrHash {
398418
}
399419
return v
400420
}
421+
422+
type CompactResponse struct {
423+
}
424+
425+
type CompactRequest struct {
426+
Revision int64
427+
}

tests/robustness/model/history.go

+27
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ package model
1616

1717
import (
1818
"fmt"
19+
"strings"
1920
"time"
2021

2122
"github.com/anishathalye/porcupine"
2223

2324
"go.etcd.io/etcd/api/v3/etcdserverpb"
2425
"go.etcd.io/etcd/api/v3/mvccpb"
2526
clientv3 "go.etcd.io/etcd/client/v3"
27+
"go.etcd.io/etcd/server/v3/storage/mvcc"
2628
"go.etcd.io/etcd/tests/v3/robustness/identity"
2729
)
2830

@@ -259,6 +261,23 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
259261
h.appendSuccessful(request, start, end, defragmentResponse(revision))
260262
}
261263

264+
func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
265+
request := compactRequest(rev)
266+
if err != nil {
267+
if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) {
268+
h.appendSuccessful(request, start, end, MaybeEtcdResponse{
269+
EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()},
270+
})
271+
return
272+
}
273+
h.appendFailed(request, start, end, err)
274+
return
275+
}
276+
// Set fake revision as compaction returns non-linearizable revision.
277+
// TODO: Model non-linearizable response revision in model.
278+
h.appendSuccessful(request, start, end, compactResponse(-1))
279+
}
280+
262281
func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) {
263282
op := porcupine.Operation{
264283
ClientId: h.streamID,
@@ -444,6 +463,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
444463
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}}
445464
}
446465

466+
func compactRequest(rev int64) EtcdRequest {
467+
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
468+
}
469+
470+
func compactResponse(revision int64) MaybeEtcdResponse {
471+
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
472+
}
473+
447474
type History struct {
448475
operations []porcupine.Operation
449476
}

tests/robustness/options/server_config_options.go

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ func WithSnapshotCount(input ...uint64) e2e.EPClusterOption {
2626
}
2727
}
2828

29+
func WithCompactionBatchLimit(input ...int) e2e.EPClusterOption {
30+
return func(c *e2e.EtcdProcessClusterConfig) {
31+
c.ServerConfig.ExperimentalCompactionBatchLimit = input[internalRand.Intn(len(input))]
32+
}
33+
}
34+
2935
func WithSnapshotCatchUpEntries(input ...uint64) e2e.EPClusterOption {
3036
return func(c *e2e.EtcdProcessClusterConfig) {
3137
c.ServerConfig.SnapshotCatchUpEntries = input[internalRand.Intn(len(input))]

tests/robustness/report/wal.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
184184
case raftReq.ClusterVersionSet != nil:
185185
return nil, nil
186186
case raftReq.Compaction != nil:
187-
return nil, nil
187+
request := model.EtcdRequest{
188+
Type: model.Compact,
189+
Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision},
190+
}
191+
return &request, nil
188192
case raftReq.Txn != nil:
189193
txn := model.TxnRequest{
190194
Conditions: []model.EtcdCondition{},

tests/robustness/scenarios.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
9191
options.WithSnapshotCount(50, 100, 1000),
9292
options.WithSubsetOptions(randomizableOptions...),
9393
e2e.WithGoFailEnabled(true),
94-
e2e.WithCompactionBatchLimit(100),
94+
// Set low minimal compaction batch limit to allow for triggering multi batch compaction failpoints.
95+
options.WithCompactionBatchLimit(10, 100, 1000),
9596
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
9697
}
9798

tests/robustness/traffic/etcd.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@ var (
3838
{choice: List, weight: 15},
3939
{choice: StaleGet, weight: 10},
4040
{choice: StaleList, weight: 10},
41-
{choice: Put, weight: 23},
42-
{choice: LargePut, weight: 2},
4341
{choice: Delete, weight: 5},
4442
{choice: MultiOpTxn, weight: 5},
4543
{choice: PutWithLease, weight: 5},
4644
{choice: LeaseRevoke, weight: 5},
4745
{choice: CompareAndSet, weight: 5},
46+
{choice: Put, weight: 15},
47+
{choice: LargePut, weight: 5},
48+
{choice: Compact, weight: 5},
4849
},
4950
}
5051
EtcdPut = etcdTraffic{
@@ -56,9 +57,10 @@ var (
5657
{choice: List, weight: 15},
5758
{choice: StaleGet, weight: 10},
5859
{choice: StaleList, weight: 10},
59-
{choice: Put, weight: 40},
6060
{choice: MultiOpTxn, weight: 5},
6161
{choice: LargePut, weight: 5},
62+
{choice: Put, weight: 35},
63+
{choice: Compact, weight: 5},
6264
},
6365
}
6466
)
@@ -89,6 +91,7 @@ const (
8991
LeaseRevoke etcdRequestType = "leaseRevoke"
9092
CompareAndSet etcdRequestType = "compareAndSet"
9193
Defragment etcdRequestType = "defragment"
94+
Compact etcdRequestType = "compact"
9295
)
9396

9497
func (t etcdTraffic) Name() string {
@@ -264,6 +267,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
264267
if resp != nil {
265268
rev = resp.Header.Revision
266269
}
270+
case Compact:
271+
var resp *clientv3.CompactResponse
272+
resp, err = c.client.Compact(opCtx, lastRev)
273+
if resp != nil {
274+
rev = resp.Header.Revision
275+
}
267276
default:
268277
panic("invalid choice")
269278
}

tests/robustness/traffic/kubernetes.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ var (
3737
resource: "pods",
3838
namespace: "default",
3939
writeChoices: []choiceWeight[KubernetesRequestType]{
40-
{choice: KubernetesUpdate, weight: 90},
40+
{choice: KubernetesUpdate, weight: 85},
4141
{choice: KubernetesDelete, weight: 5},
4242
{choice: KubernetesCreate, weight: 5},
43+
{choice: KubernetesCompact, weight: 5},
4344
},
4445
}
4546
)
@@ -167,6 +168,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
167168
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
168169
case KubernetesCreate:
169170
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
171+
case KubernetesCompact:
172+
err = kc.Compact(writeCtx, rev)
170173
default:
171174
panic(fmt.Sprintf("invalid choice: %q", op))
172175
}
@@ -209,9 +212,10 @@ func (t kubernetesTraffic) generateKey() string {
209212
type KubernetesRequestType string
210213

211214
const (
212-
KubernetesDelete KubernetesRequestType = "delete"
213-
KubernetesUpdate KubernetesRequestType = "update"
214-
KubernetesCreate KubernetesRequestType = "create"
215+
KubernetesDelete KubernetesRequestType = "delete"
216+
KubernetesUpdate KubernetesRequestType = "update"
217+
KubernetesCreate KubernetesRequestType = "create"
218+
KubernetesCompact KubernetesRequestType = "compact"
215219
)
216220

217221
type kubernetesClient struct {
@@ -250,6 +254,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error {
250254
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
251255
}
252256

257+
func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
258+
_, err := k.client.Compact(ctx, rev)
259+
return err
260+
}
261+
253262
// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
254263
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
255264
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {

tests/robustness/validate/patch_history.go

+2
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
192192
}
193193
}
194194
case model.LeaseGrant:
195+
case model.Compact:
195196
default:
196197
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
197198
}
@@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
216217
}
217218
case model.Range:
218219
case model.LeaseGrant:
220+
case model.Compact:
219221
default:
220222
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
221223
}

0 commit comments

Comments
 (0)