@@ -20,21 +20,38 @@ import (
20
20
"context"
21
21
"log"
22
22
"os"
23
+ "sync"
23
24
"time"
24
25
25
26
"github.com/antithesishq/antithesis-sdk-go/assert"
26
27
"github.com/antithesishq/antithesis-sdk-go/random"
28
+ "golang.org/x/time/rate"
27
29
28
- clientv3 "go.etcd.io/etcd/client/v3"
29
30
"go.etcd.io/etcd/tests/v3/robustness/client"
30
31
"go.etcd.io/etcd/tests/v3/robustness/identity"
32
+ robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random"
33
+ "go.etcd.io/etcd/tests/v3/robustness/report"
34
+ "go.etcd.io/etcd/tests/v3/robustness/traffic"
31
35
)
32
36
33
- func Connect () * client.RecordingClient {
34
- // This function returns a client connection to an etcd node
37
+ var (
38
+ // Please keep the sum of weights equal 100.
39
+ profile = traffic.Profile {
40
+ MinimalQPS : 100 ,
41
+ MaximalQPS : 1000 ,
42
+ BurstableQPS : 1000 ,
43
+ ClientCount : 3 ,
44
+ MaxNonUniqueRequestConcurrency : 3 ,
45
+ }
46
+ IDProvider = identity .NewIDProvider ()
47
+ LeaseIDStorage = identity .NewLeaseIDStorage ()
48
+ ConcurrencyLimiter = traffic .NewConcurrencyLimiter (profile .MaxNonUniqueRequestConcurrency )
49
+ )
35
50
51
+ // Connect returns a client connection to an etcd node
52
+ func Connect () * client.RecordingClient {
36
53
hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
37
- cli , err := client .NewRecordingClient (hosts , identity . NewIDProvider () , time .Now ())
54
+ cli , err := client .NewRecordingClient (hosts , IDProvider , time .Now ())
38
55
if err != nil {
39
56
log .Fatalf ("Failed to connect to etcd: %v" , err )
40
57
// Antithesis Assertion: client should always be able to connect to an etcd host
@@ -45,77 +62,49 @@ func Connect() *client.RecordingClient {
45
62
return cli
46
63
}
47
64
48
- func DeleteKeys () {
49
- // This function will:
50
- // 1. Get all keys
51
- // 2. Select half of the keys received
52
- // 3. Attempt to delete the keys selected
53
- // 4. Check that the keys were deleted
54
-
65
+ func testRobustness () {
55
66
ctx := context .Background ()
56
-
57
- // Connect to an etcd node
58
- cli := Connect ()
59
-
60
- // Get all keys
61
- resp , err := cli .Get (ctx , "" , clientv3 .WithPrefix ())
62
-
63
- // Antithesis Assertion: sometimes get with prefix requests are successful. A failed request is OK since we expect them to happen.
64
- assert .Sometimes (err == nil , "Client can make successful get all requests" , map [string ]any {"error" : err })
65
- cli .Close ()
66
-
67
- if err != nil {
68
- log .Printf ("Client failed to get all keys: %v" , err )
69
- os .Exit (0 )
67
+ var wg sync.WaitGroup
68
+ var mux sync.Mutex
69
+ runfor := time .Duration (robustnessrand .RandRange (5 , 60 ) * int64 (time .Second ))
70
+ limiter := rate .NewLimiter (rate .Limit (profile .MaximalQPS ), profile .BurstableQPS )
71
+ finish := wrap (time .After (runfor ))
72
+ reports := []report.ClientReport {}
73
+
74
+ for range profile .ClientCount {
75
+ wg .Add (1 )
76
+ c := Connect ()
77
+ go func (c * client.RecordingClient ) {
78
+ defer wg .Done ()
79
+ defer c .Close ()
80
+
81
+ traffic .EtcdAntithesis .RunTrafficLoop (ctx , c , limiter ,
82
+ IDProvider ,
83
+ LeaseIDStorage ,
84
+ ConcurrencyLimiter ,
85
+ finish ,
86
+ )
87
+ mux .Lock ()
88
+ reports = append (reports , c .Report ())
89
+ mux .Unlock ()
90
+ }(c )
70
91
}
92
+ wg .Wait ()
93
+ assert .Reachable ("Completion robustness traffic generation" , nil )
94
+ }
71
95
72
- // Choose half of the keys
73
- var keys []string
74
- for _ , k := range resp .Kvs {
75
- keys = append (keys , string (k .Key ))
76
- }
77
- half := len (keys ) / 2
78
- halfKeys := keys [:half ]
79
-
80
- // Connect to a new etcd node
81
- cli = Connect ()
82
-
83
- // Delete half of the keys chosen
84
- var deletedKeys []string
85
- for _ , k := range halfKeys {
86
- _ , err := cli .Delete (ctx , k )
87
- // Antithesis Assertion: sometimes delete requests are successful. A failed request is OK since we expect them to happen.
88
- assert .Sometimes (err == nil , "Client can make successful delete requests" , map [string ]any {"error" : err })
89
- if err != nil {
90
- log .Printf ("Failed to delete key %s: %v" , k , err )
91
- } else {
92
- log .Printf ("Successfully deleted key %v" , k )
93
- deletedKeys = append (deletedKeys , k )
94
- }
95
- }
96
- cli .Close ()
97
-
98
- // Connect to a new etcd node
99
- cli = Connect ()
100
-
101
- // Check to see if those keys were deleted / exist
102
- for _ , k := range deletedKeys {
103
- resp , err := cli .Get (ctx , k )
104
- // Antithesis Assertion: sometimes get requests are successful. A failed request is OK since we expect them to happen.
105
- assert .Sometimes (err == nil , "Client can make successful get requests" , map [string ]any {"error" : err })
106
- if err != nil {
107
- log .Printf ("Client failed to get key %s: %v" , k , err )
108
- continue
96
+ // wrap converts a receive-only channel to receive-only struct{} channel
97
+ func wrap [T any ](from <- chan T ) <- chan struct {} {
98
+ out := make (chan struct {})
99
+ go func () {
100
+ for {
101
+ <- from
102
+ out <- struct {}{}
109
103
}
110
- // Antithesis Assertion: if we deleted a key, we should not get a value
111
- assert .Always (resp .Count == 0 , "Key was deleted correctly" , map [string ]any {"key" : k })
112
- }
113
- cli .Close ()
114
-
115
- assert .Reachable ("Completion of a key deleting check" , nil )
116
- log .Printf ("Completion of a key deleting check" )
104
+ }()
105
+ return out
117
106
}
118
107
119
108
func main () {
120
- DeleteKeys ()
109
+ testRobustness ()
121
110
}
0 commit comments