Skip to content

Commit 1f3be25

Browse files
author
Nont
committed
Use robustness traffic generation
Signed-off-by: Nont <[email protected]>
1 parent d610b04 commit 1f3be25

File tree

2 files changed

+105
-114
lines changed

2 files changed

+105
-114
lines changed

tests/antithesis/test-template/go-delete-keys/serial_driver_delete_keys.go

+57-66
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,18 @@ import (
2020
"context"
2121
"log"
2222
"os"
23+
"sync"
2324
"time"
2425

2526
"github.com/antithesishq/antithesis-sdk-go/assert"
2627
"github.com/antithesishq/antithesis-sdk-go/random"
28+
"golang.org/x/time/rate"
2729

28-
clientv3 "go.etcd.io/etcd/client/v3"
2930
"go.etcd.io/etcd/tests/v3/robustness/client"
3031
"go.etcd.io/etcd/tests/v3/robustness/identity"
32+
robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random"
33+
"go.etcd.io/etcd/tests/v3/robustness/report"
34+
"go.etcd.io/etcd/tests/v3/robustness/traffic"
3135
)
3236

3337
func Connect() *client.RecordingClient {
@@ -45,77 +49,64 @@ func Connect() *client.RecordingClient {
4549
return cli
4650
}
4751

48-
func DeleteKeys() {
49-
// This function will:
50-
// 1. Get all keys
51-
// 2. Select half of the keys received
52-
// 3. Attempt to delete the keys selected
53-
// 4. Check that the keys were deleted
54-
55-
ctx := context.Background()
56-
57-
// Connect to an etcd node
58-
cli := Connect()
59-
60-
// Get all keys
61-
resp, err := cli.Get(ctx, "", clientv3.WithPrefix())
62-
63-
// Antithesis Assertion: sometimes get with prefix requests are successful. A failed request is OK since we expect them to happen.
64-
assert.Sometimes(err == nil, "Client can make successful get all requests", map[string]any{"error": err})
65-
cli.Close()
66-
67-
if err != nil {
68-
log.Printf("Client failed to get all keys: %v", err)
69-
os.Exit(0)
70-
}
71-
72-
// Choose half of the keys
73-
var keys []string
74-
for _, k := range resp.Kvs {
75-
keys = append(keys, string(k.Key))
76-
}
77-
half := len(keys) / 2
78-
halfKeys := keys[:half]
52+
var etcdTraffic traffic.Traffic = traffic.EtcdTraffic{
53+
KeyCount: 10,
54+
// Please keep the sum of weights equal 100.
55+
Requests: []robustnessrand.ChoiceWeight[traffic.EtcdRequestType]{
56+
{Choice: traffic.Get, Weight: 50},
57+
{Choice: traffic.Put, Weight: 50},
58+
},
59+
}
7960

80-
// Connect to a new etcd node
81-
cli = Connect()
61+
var profile = traffic.Profile{
62+
MinimalQPS: 100,
63+
MaximalQPS: 1000,
64+
BurstableQPS: 1000,
65+
ClientCount: 3,
66+
MaxNonUniqueRequestConcurrency: 3,
67+
}
8268

83-
// Delete half of the keys chosen
84-
var deletedKeys []string
85-
for _, k := range halfKeys {
86-
_, err := cli.Delete(ctx, k)
87-
// Antithesis Assertion: sometimes delete requests are successful. A failed request is OK since we expect them to happen.
88-
assert.Sometimes(err == nil, "Client can make successful delete requests", map[string]any{"error": err})
89-
if err != nil {
90-
log.Printf("Failed to delete key %s: %v", k, err)
91-
} else {
92-
log.Printf("Successfully deleted key %v", k)
93-
deletedKeys = append(deletedKeys, k)
69+
// wrap converts a receive-only channel to receive-only struct{} channel
70+
func wrap[T any](from <-chan T) <-chan struct{} {
71+
out := make(chan struct{})
72+
go func() {
73+
for {
74+
<-from
75+
out <- struct{}{}
9476
}
95-
}
96-
cli.Close()
97-
98-
// Connect to a new etcd node
99-
cli = Connect()
77+
}()
78+
return out
79+
}
10080

101-
// Check to see if those keys were deleted / exist
102-
for _, k := range deletedKeys {
103-
resp, err := cli.Get(ctx, k)
104-
// Antithesis Assertion: sometimes get requests are successful. A failed request is OK since we expect them to happen.
105-
assert.Sometimes(err == nil, "Client can make successful get requests", map[string]any{"error": err})
106-
if err != nil {
107-
log.Printf("Client failed to get key %s: %v", k, err)
108-
continue
109-
}
110-
// Antithesis Assertion: if we deleted a key, we should not get a value
111-
assert.Always(resp.Count == 0, "Key was deleted correctly", map[string]any{"key": k})
81+
func testRobustness() {
82+
ctx := context.Background()
83+
var wg sync.WaitGroup
84+
var mux sync.Mutex
85+
runfor := time.Duration(robustnessrand.RandRange(5, 60) * int64(time.Second))
86+
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), profile.BurstableQPS)
87+
finish := wrap(time.After(runfor))
88+
reports := []report.ClientReport{}
89+
90+
for range profile.ClientCount {
91+
wg.Add(1)
92+
c := Connect()
93+
go func(c *client.RecordingClient) {
94+
defer wg.Done()
95+
defer c.Close()
96+
97+
etcdTraffic.RunTrafficLoop(ctx, c, limiter,
98+
identity.NewIDProvider(),
99+
identity.NewLeaseIDStorage(),
100+
traffic.NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency),
101+
finish,
102+
)
103+
mux.Lock()
104+
reports = append(reports, c.Report())
105+
mux.Unlock()
106+
}(c)
112107
}
113-
cli.Close()
114-
115-
assert.Reachable("Completion of a key deleting check", nil)
116-
log.Printf("Completion of a key deleting check")
117108
}
118109

119110
func main() {
120-
DeleteKeys()
111+
testRobustness()
121112
}

tests/robustness/traffic/etcd.go

+48-48
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ import (
3131
)
3232

3333
var (
34-
EtcdPutDeleteLease Traffic = etcdTraffic{
35-
keyCount: 10,
36-
leaseTTL: DefaultLeaseTTL,
37-
largePutSize: 32769,
34+
EtcdPutDeleteLease Traffic = EtcdTraffic{
35+
KeyCount: 10,
36+
LeaseTTL: DefaultLeaseTTL,
37+
LargePutSize: 32769,
3838
// Please keep the sum of weights equal 100.
39-
requests: []random.ChoiceWeight[etcdRequestType]{
39+
Requests: []random.ChoiceWeight[EtcdRequestType]{
4040
{Choice: Get, Weight: 15},
4141
{Choice: List, Weight: 15},
4242
{Choice: StaleGet, Weight: 10},
@@ -50,12 +50,12 @@ var (
5050
{Choice: LargePut, Weight: 5},
5151
},
5252
}
53-
EtcdPut Traffic = etcdTraffic{
54-
keyCount: 10,
55-
largePutSize: 32769,
56-
leaseTTL: DefaultLeaseTTL,
53+
EtcdPut Traffic = EtcdTraffic{
54+
KeyCount: 10,
55+
LargePutSize: 32769,
56+
LeaseTTL: DefaultLeaseTTL,
5757
// Please keep the sum of weights equal 100.
58-
requests: []random.ChoiceWeight[etcdRequestType]{
58+
Requests: []random.ChoiceWeight[EtcdRequestType]{
5959
{Choice: Get, Weight: 15},
6060
{Choice: List, Weight: 15},
6161
{Choice: StaleGet, Weight: 10},
@@ -65,56 +65,56 @@ var (
6565
{Choice: Put, Weight: 40},
6666
},
6767
}
68-
EtcdDelete Traffic = etcdTraffic{
69-
keyCount: 10,
70-
largePutSize: 32769,
71-
leaseTTL: DefaultLeaseTTL,
68+
EtcdDelete Traffic = EtcdTraffic{
69+
KeyCount: 10,
70+
LargePutSize: 32769,
71+
LeaseTTL: DefaultLeaseTTL,
7272
// Please keep the sum of weights equal 100.
73-
requests: []random.ChoiceWeight[etcdRequestType]{
73+
Requests: []random.ChoiceWeight[EtcdRequestType]{
7474
{Choice: Put, Weight: 50},
7575
{Choice: Delete, Weight: 50},
7676
},
7777
}
7878
)
7979

80-
type etcdTraffic struct {
81-
keyCount int
82-
requests []random.ChoiceWeight[etcdRequestType]
83-
leaseTTL int64
84-
largePutSize int
80+
type EtcdTraffic struct {
81+
KeyCount int
82+
Requests []random.ChoiceWeight[EtcdRequestType]
83+
LeaseTTL int64
84+
LargePutSize int
8585
}
8686

87-
func (t etcdTraffic) ExpectUniqueRevision() bool {
87+
func (t EtcdTraffic) ExpectUniqueRevision() bool {
8888
return false
8989
}
9090

91-
type etcdRequestType string
91+
type EtcdRequestType string
9292

9393
const (
94-
Get etcdRequestType = "get"
95-
StaleGet etcdRequestType = "staleGet"
96-
List etcdRequestType = "list"
97-
StaleList etcdRequestType = "staleList"
98-
Put etcdRequestType = "put"
99-
LargePut etcdRequestType = "largePut"
100-
Delete etcdRequestType = "delete"
101-
MultiOpTxn etcdRequestType = "multiOpTxn"
102-
PutWithLease etcdRequestType = "putWithLease"
103-
LeaseRevoke etcdRequestType = "leaseRevoke"
104-
CompareAndSet etcdRequestType = "compareAndSet"
105-
Defragment etcdRequestType = "defragment"
94+
Get EtcdRequestType = "get"
95+
StaleGet EtcdRequestType = "staleGet"
96+
List EtcdRequestType = "list"
97+
StaleList EtcdRequestType = "staleList"
98+
Put EtcdRequestType = "put"
99+
LargePut EtcdRequestType = "largePut"
100+
Delete EtcdRequestType = "delete"
101+
MultiOpTxn EtcdRequestType = "multiOpTxn"
102+
PutWithLease EtcdRequestType = "putWithLease"
103+
LeaseRevoke EtcdRequestType = "leaseRevoke"
104+
CompareAndSet EtcdRequestType = "compareAndSet"
105+
Defragment EtcdRequestType = "defragment"
106106
)
107107

108-
func (t etcdTraffic) Name() string {
108+
func (t EtcdTraffic) Name() string {
109109
return "Etcd"
110110
}
111111

112-
func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
112+
func (t EtcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
113113
lastOperationSucceeded := true
114114
var lastRev int64
115-
var requestType etcdRequestType
115+
var requestType EtcdRequestType
116116
client := etcdTrafficClient{
117-
etcdTraffic: t,
117+
EtcdTraffic: t,
118118
keyPrefix: "key",
119119
client: c,
120120
limiter: limiter,
@@ -133,7 +133,7 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
133133

134134
// Avoid multiple failed writes in a row
135135
if lastOperationSucceeded {
136-
choices := t.requests
136+
choices := t.Requests
137137
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
138138
choices = filterOutNonUniqueEtcdWrites(choices)
139139
}
@@ -156,7 +156,7 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
156156
}
157157
}
158158

159-
func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
159+
func (t EtcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
160160
var lastRev int64 = 2
161161
ticker := time.NewTicker(period)
162162
defer ticker.Stop()
@@ -185,7 +185,7 @@ func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClie
185185
}
186186
}
187187

188-
func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]) (resp []random.ChoiceWeight[etcdRequestType]) {
188+
func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[EtcdRequestType]) (resp []random.ChoiceWeight[EtcdRequestType]) {
189189
for _, choice := range choices {
190190
if choice.Choice != Delete && choice.Choice != LeaseRevoke {
191191
resp = append(resp, choice)
@@ -195,15 +195,15 @@ func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]
195195
}
196196

197197
type etcdTrafficClient struct {
198-
etcdTraffic
198+
EtcdTraffic
199199
keyPrefix string
200200
client *client.RecordingClient
201201
limiter *rate.Limiter
202202
idProvider identity.Provider
203203
leaseStorage identity.LeaseIDStorage
204204
}
205205

206-
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, lastRev int64) (rev int64, err error) {
206+
func (c etcdTrafficClient) Request(ctx context.Context, request EtcdRequestType, lastRev int64) (rev int64, err error) {
207207
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
208208
defer cancel()
209209

@@ -241,7 +241,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
241241
}
242242
case LargePut:
243243
var resp *clientv3.PutResponse
244-
resp, err = c.client.Put(opCtx, c.randomKey(), random.RandString(c.largePutSize))
244+
resp, err = c.client.Put(opCtx, c.randomKey(), random.RandString(c.LargePutSize))
245245
if resp != nil {
246246
rev = resp.Header.Revision
247247
}
@@ -290,7 +290,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
290290
leaseID := c.leaseStorage.LeaseID(c.client.ID)
291291
if leaseID == 0 {
292292
var resp *clientv3.LeaseGrantResponse
293-
resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
293+
resp, err = c.client.LeaseGrant(opCtx, c.LeaseTTL)
294294
if resp != nil {
295295
leaseID = int64(resp.ID)
296296
rev = resp.ResponseHeader.Revision
@@ -335,7 +335,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
335335
}
336336

337337
func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
338-
keys := rand.Perm(c.keyCount)
338+
keys := rand.Perm(c.KeyCount)
339339
opTypes := make([]model.OperationType, 4)
340340

341341
atLeastOnePut := false
@@ -372,10 +372,10 @@ func (c etcdTrafficClient) randomKey() string {
372372
}
373373

374374
func (c etcdTrafficClient) key(i int) string {
375-
return fmt.Sprintf("%s%d", c.keyPrefix, i%c.keyCount)
375+
return fmt.Sprintf("%s%d", c.keyPrefix, i%c.KeyCount)
376376
}
377377

378-
func (t etcdTraffic) pickOperationType() model.OperationType {
378+
func (t EtcdTraffic) pickOperationType() model.OperationType {
379379
roll := rand.Int() % 100
380380
if roll < 10 {
381381
return model.DeleteOperation

0 commit comments

Comments
 (0)