Skip to content

Commit 3c5c7c7

Browse files
author
Nont
committed
Use robustness traffic generation
Signed-off-by: Nont <[email protected]>
1 parent eae9a91 commit 3c5c7c7

File tree

2 files changed

+121
-118
lines changed
  • tests

2 files changed

+121
-118
lines changed

tests/antithesis/test-template/robustness/main.go

+64-70
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,43 @@ import (
2020
"context"
2121
"log"
2222
"os"
23+
"sync"
2324
"time"
2425

2526
"github.com/antithesishq/antithesis-sdk-go/assert"
2627
"github.com/antithesishq/antithesis-sdk-go/random"
28+
"golang.org/x/time/rate"
2729

28-
clientv3 "go.etcd.io/etcd/client/v3"
2930
"go.etcd.io/etcd/tests/v3/robustness/client"
3031
"go.etcd.io/etcd/tests/v3/robustness/identity"
32+
robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random"
33+
"go.etcd.io/etcd/tests/v3/robustness/report"
34+
"go.etcd.io/etcd/tests/v3/robustness/traffic"
3135
)
3236

33-
func Connect() *client.RecordingClient {
34-
// This function returns a client connection to an etcd node
37+
var (
38+
// Please keep the sum of weights equal 100.
39+
requests = []robustnessrand.ChoiceWeight[traffic.EtcdRequestType]{
40+
{Choice: traffic.Get, Weight: 50},
41+
{Choice: traffic.Put, Weight: 50},
42+
}
43+
etcdTraffic traffic.Traffic = traffic.NewEtcdTraffic(10, 0, 0, requests)
44+
profile = traffic.Profile{
45+
MinimalQPS: 100,
46+
MaximalQPS: 1000,
47+
BurstableQPS: 1000,
48+
ClientCount: 3,
49+
MaxNonUniqueRequestConcurrency: 3,
50+
}
51+
IDProvider = identity.NewIDProvider()
52+
LeaseIDStorage = identity.NewLeaseIDStorage()
53+
ConcurrencyLimiter = traffic.NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
54+
)
3555

56+
// Connect returns a client connection to an etcd node
57+
func Connect() *client.RecordingClient {
3658
hosts := []string{"etcd0:2379", "etcd1:2379", "etcd2:2379"}
37-
cli, err := client.NewRecordingClient(hosts, identity.NewIDProvider(), time.Now())
59+
cli, err := client.NewRecordingClient(hosts, IDProvider, time.Now())
3860
if err != nil {
3961
log.Fatalf("Failed to connect to etcd: %v", err)
4062
// Antithesis Assertion: client should always be able to connect to an etcd host
@@ -45,77 +67,49 @@ func Connect() *client.RecordingClient {
4567
return cli
4668
}
4769

48-
func DeleteKeys() {
49-
// This function will:
50-
// 1. Get all keys
51-
// 2. Select half of the keys received
52-
// 3. Attempt to delete the keys selected
53-
// 4. Check that the keys were deleted
54-
70+
func testRobustness() {
5571
ctx := context.Background()
56-
57-
// Connect to an etcd node
58-
cli := Connect()
59-
60-
// Get all keys
61-
resp, err := cli.Get(ctx, "", clientv3.WithPrefix())
62-
63-
// Antithesis Assertion: sometimes get with prefix requests are successful. A failed request is OK since we expect them to happen.
64-
assert.Sometimes(err == nil, "Client can make successful get all requests", map[string]any{"error": err})
65-
cli.Close()
66-
67-
if err != nil {
68-
log.Printf("Client failed to get all keys: %v", err)
69-
os.Exit(0)
72+
var wg sync.WaitGroup
73+
var mux sync.Mutex
74+
runfor := time.Duration(robustnessrand.RandRange(5, 60) * int64(time.Second))
75+
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), profile.BurstableQPS)
76+
finish := wrap(time.After(runfor))
77+
reports := []report.ClientReport{}
78+
79+
for range profile.ClientCount {
80+
wg.Add(1)
81+
c := Connect()
82+
go func(c *client.RecordingClient) {
83+
defer wg.Done()
84+
defer c.Close()
85+
86+
etcdTraffic.RunTrafficLoop(ctx, c, limiter,
87+
IDProvider,
88+
LeaseIDStorage,
89+
ConcurrencyLimiter,
90+
finish,
91+
)
92+
mux.Lock()
93+
reports = append(reports, c.Report())
94+
mux.Unlock()
95+
}(c)
7096
}
97+
wg.Wait()
98+
assert.Reachable("Completion robustness traffic generation", nil)
99+
}
71100

72-
// Choose half of the keys
73-
var keys []string
74-
for _, k := range resp.Kvs {
75-
keys = append(keys, string(k.Key))
76-
}
77-
half := len(keys) / 2
78-
halfKeys := keys[:half]
79-
80-
// Connect to a new etcd node
81-
cli = Connect()
82-
83-
// Delete half of the keys chosen
84-
var deletedKeys []string
85-
for _, k := range halfKeys {
86-
_, err := cli.Delete(ctx, k)
87-
// Antithesis Assertion: sometimes delete requests are successful. A failed request is OK since we expect them to happen.
88-
assert.Sometimes(err == nil, "Client can make successful delete requests", map[string]any{"error": err})
89-
if err != nil {
90-
log.Printf("Failed to delete key %s: %v", k, err)
91-
} else {
92-
log.Printf("Successfully deleted key %v", k)
93-
deletedKeys = append(deletedKeys, k)
94-
}
95-
}
96-
cli.Close()
97-
98-
// Connect to a new etcd node
99-
cli = Connect()
100-
101-
// Check to see if those keys were deleted / exist
102-
for _, k := range deletedKeys {
103-
resp, err := cli.Get(ctx, k)
104-
// Antithesis Assertion: sometimes get requests are successful. A failed request is OK since we expect them to happen.
105-
assert.Sometimes(err == nil, "Client can make successful get requests", map[string]any{"error": err})
106-
if err != nil {
107-
log.Printf("Client failed to get key %s: %v", k, err)
108-
continue
101+
// wrap converts a receive-only channel to receive-only struct{} channel
102+
func wrap[T any](from <-chan T) <-chan struct{} {
103+
out := make(chan struct{})
104+
go func() {
105+
for {
106+
<-from
107+
out <- struct{}{}
109108
}
110-
// Antithesis Assertion: if we deleted a key, we should not get a value
111-
assert.Always(resp.Count == 0, "Key was deleted correctly", map[string]any{"key": k})
112-
}
113-
cli.Close()
114-
115-
assert.Reachable("Completion of a key deleting check", nil)
116-
log.Printf("Completion of a key deleting check")
109+
}()
110+
return out
117111
}
118112

119113
func main() {
120-
DeleteKeys()
114+
testRobustness()
121115
}

tests/robustness/traffic/etcd.go

+57-48
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ import (
3131
)
3232

3333
var (
34-
EtcdPutDeleteLease Traffic = etcdTraffic{
35-
keyCount: 10,
36-
leaseTTL: DefaultLeaseTTL,
37-
largePutSize: 32769,
34+
EtcdPutDeleteLease Traffic = EtcdTraffic{
35+
KeyCount: 10,
36+
LeaseTTL: DefaultLeaseTTL,
37+
LargePutSize: 32769,
3838
// Please keep the sum of weights equal 100.
39-
requests: []random.ChoiceWeight[etcdRequestType]{
39+
Requests: []random.ChoiceWeight[EtcdRequestType]{
4040
{Choice: Get, Weight: 15},
4141
{Choice: List, Weight: 15},
4242
{Choice: StaleGet, Weight: 10},
@@ -50,12 +50,12 @@ var (
5050
{Choice: LargePut, Weight: 5},
5151
},
5252
}
53-
EtcdPut Traffic = etcdTraffic{
54-
keyCount: 10,
55-
largePutSize: 32769,
56-
leaseTTL: DefaultLeaseTTL,
53+
EtcdPut Traffic = EtcdTraffic{
54+
KeyCount: 10,
55+
LargePutSize: 32769,
56+
LeaseTTL: DefaultLeaseTTL,
5757
// Please keep the sum of weights equal 100.
58-
requests: []random.ChoiceWeight[etcdRequestType]{
58+
Requests: []random.ChoiceWeight[EtcdRequestType]{
5959
{Choice: Get, Weight: 15},
6060
{Choice: List, Weight: 15},
6161
{Choice: StaleGet, Weight: 10},
@@ -65,56 +65,65 @@ var (
6565
{Choice: Put, Weight: 40},
6666
},
6767
}
68-
EtcdDelete Traffic = etcdTraffic{
69-
keyCount: 10,
70-
largePutSize: 32769,
71-
leaseTTL: DefaultLeaseTTL,
68+
EtcdDelete Traffic = EtcdTraffic{
69+
KeyCount: 10,
70+
LargePutSize: 32769,
71+
LeaseTTL: DefaultLeaseTTL,
7272
// Please keep the sum of weights equal 100.
73-
requests: []random.ChoiceWeight[etcdRequestType]{
73+
Requests: []random.ChoiceWeight[EtcdRequestType]{
7474
{Choice: Put, Weight: 50},
7575
{Choice: Delete, Weight: 50},
7676
},
7777
}
7878
)
7979

80-
type etcdTraffic struct {
81-
keyCount int
82-
requests []random.ChoiceWeight[etcdRequestType]
83-
leaseTTL int64
84-
largePutSize int
80+
type EtcdTraffic struct {
81+
KeyCount int
82+
Requests []random.ChoiceWeight[EtcdRequestType]
83+
LeaseTTL int64
84+
LargePutSize int
8585
}
8686

87-
func (t etcdTraffic) ExpectUniqueRevision() bool {
87+
func NewEtcdTraffic(keycount, largePutsize int, leaseTTL int64, requests []random.ChoiceWeight[EtcdRequestType]) EtcdTraffic {
88+
return EtcdTraffic{
89+
KeyCount: keycount,
90+
Requests: requests,
91+
LeaseTTL: leaseTTL,
92+
LargePutSize: largePutsize,
93+
}
94+
}
95+
96+
func (t EtcdTraffic) ExpectUniqueRevision() bool {
8897
return false
8998
}
9099

91-
type etcdRequestType string
100+
type EtcdRequestType string
92101

93102
const (
94-
Get etcdRequestType = "get"
95-
StaleGet etcdRequestType = "staleGet"
96-
List etcdRequestType = "list"
97-
StaleList etcdRequestType = "staleList"
98-
Put etcdRequestType = "put"
99-
LargePut etcdRequestType = "largePut"
100-
Delete etcdRequestType = "delete"
101-
MultiOpTxn etcdRequestType = "multiOpTxn"
102-
PutWithLease etcdRequestType = "putWithLease"
103-
LeaseRevoke etcdRequestType = "leaseRevoke"
104-
CompareAndSet etcdRequestType = "compareAndSet"
105-
Defragment etcdRequestType = "defragment"
103+
Get EtcdRequestType = "get"
104+
StaleGet EtcdRequestType = "staleGet"
105+
List EtcdRequestType = "list"
106+
StaleList EtcdRequestType = "staleList"
107+
Put EtcdRequestType = "put"
108+
LargePut EtcdRequestType = "largePut"
109+
Delete EtcdRequestType = "delete"
110+
MultiOpTxn EtcdRequestType = "multiOpTxn"
111+
PutWithLease EtcdRequestType = "putWithLease"
112+
LeaseRevoke EtcdRequestType = "leaseRevoke"
113+
CompareAndSet EtcdRequestType = "compareAndSet"
114+
Defragment EtcdRequestType = "defragment"
106115
)
107116

108-
func (t etcdTraffic) Name() string {
117+
func (t EtcdTraffic) Name() string {
109118
return "Etcd"
110119
}
111120

112-
func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
121+
func (t EtcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
113122
lastOperationSucceeded := true
114123
var lastRev int64
115-
var requestType etcdRequestType
124+
var requestType EtcdRequestType
116125
client := etcdTrafficClient{
117-
etcdTraffic: t,
126+
EtcdTraffic: t,
118127
keyPrefix: "key",
119128
client: c,
120129
limiter: limiter,
@@ -133,7 +142,7 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
133142

134143
// Avoid multiple failed writes in a row
135144
if lastOperationSucceeded {
136-
choices := t.requests
145+
choices := t.Requests
137146
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
138147
choices = filterOutNonUniqueEtcdWrites(choices)
139148
}
@@ -156,7 +165,7 @@ func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClie
156165
}
157166
}
158167

159-
func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
168+
func (t EtcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
160169
var lastRev int64 = 2
161170
ticker := time.NewTicker(period)
162171
defer ticker.Stop()
@@ -185,7 +194,7 @@ func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClie
185194
}
186195
}
187196

188-
func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]) (resp []random.ChoiceWeight[etcdRequestType]) {
197+
func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[EtcdRequestType]) (resp []random.ChoiceWeight[EtcdRequestType]) {
189198
for _, choice := range choices {
190199
if choice.Choice != Delete && choice.Choice != LeaseRevoke {
191200
resp = append(resp, choice)
@@ -195,15 +204,15 @@ func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]
195204
}
196205

197206
type etcdTrafficClient struct {
198-
etcdTraffic
207+
EtcdTraffic
199208
keyPrefix string
200209
client *client.RecordingClient
201210
limiter *rate.Limiter
202211
idProvider identity.Provider
203212
leaseStorage identity.LeaseIDStorage
204213
}
205214

206-
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, lastRev int64) (rev int64, err error) {
215+
func (c etcdTrafficClient) Request(ctx context.Context, request EtcdRequestType, lastRev int64) (rev int64, err error) {
207216
opCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
208217
defer cancel()
209218

@@ -241,7 +250,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
241250
}
242251
case LargePut:
243252
var resp *clientv3.PutResponse
244-
resp, err = c.client.Put(opCtx, c.randomKey(), random.RandString(c.largePutSize))
253+
resp, err = c.client.Put(opCtx, c.randomKey(), random.RandString(c.LargePutSize))
245254
if resp != nil {
246255
rev = resp.Header.Revision
247256
}
@@ -290,7 +299,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
290299
leaseID := c.leaseStorage.LeaseID(c.client.ID)
291300
if leaseID == 0 {
292301
var resp *clientv3.LeaseGrantResponse
293-
resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
302+
resp, err = c.client.LeaseGrant(opCtx, c.LeaseTTL)
294303
if resp != nil {
295304
leaseID = int64(resp.ID)
296305
rev = resp.ResponseHeader.Revision
@@ -335,7 +344,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
335344
}
336345

337346
func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
338-
keys := rand.Perm(c.keyCount)
347+
keys := rand.Perm(c.KeyCount)
339348
opTypes := make([]model.OperationType, 4)
340349

341350
atLeastOnePut := false
@@ -372,10 +381,10 @@ func (c etcdTrafficClient) randomKey() string {
372381
}
373382

374383
func (c etcdTrafficClient) key(i int) string {
375-
return fmt.Sprintf("%s%d", c.keyPrefix, i%c.keyCount)
384+
return fmt.Sprintf("%s%d", c.keyPrefix, i%c.KeyCount)
376385
}
377386

378-
func (t etcdTraffic) pickOperationType() model.OperationType {
387+
func (t EtcdTraffic) pickOperationType() model.OperationType {
379388
roll := rand.Int() % 100
380389
if roll < 10 {
381390
return model.DeleteOperation

0 commit comments

Comments
 (0)