@@ -18,104 +18,90 @@ package main
18
18
19
19
import (
20
20
"context"
21
- "log "
21
+ "fmt "
22
22
"os"
23
+ "sync"
23
24
"time"
24
25
25
26
"github.com/antithesishq/antithesis-sdk-go/assert"
26
- "github.com/antithesishq/antithesis-sdk-go/random "
27
+ "golang.org/x/time/rate "
27
28
28
- clientv3 "go.etcd.io/etcd/client/v3"
29
29
"go.etcd.io/etcd/tests/v3/robustness/client"
30
30
"go.etcd.io/etcd/tests/v3/robustness/identity"
31
+ robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random"
32
+ "go.etcd.io/etcd/tests/v3/robustness/report"
33
+ "go.etcd.io/etcd/tests/v3/robustness/traffic"
31
34
)
32
35
33
- func Connect () * client.RecordingClient {
34
- // This function returns a client connection to an etcd node
35
-
36
- hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
37
- cli , err := client .NewRecordingClient (hosts , identity .NewIDProvider (), time .Now ())
38
- if err != nil {
39
- log .Fatalf ("Failed to connect to etcd: %v" , err )
40
- // Antithesis Assertion: client should always be able to connect to an etcd host
41
- host := random .RandomChoice (hosts )
42
- assert .Unreachable ("Client failed to connect to an etcd host" , map [string ]any {"host" : host , "error" : err })
43
- os .Exit (1 )
44
- }
45
- return cli
36
+ var profile = traffic.Profile {
37
+ MinimalQPS : 100 ,
38
+ MaximalQPS : 1000 ,
39
+ BurstableQPS : 1000 ,
40
+ ClientCount : 3 ,
41
+ MaxNonUniqueRequestConcurrency : 3 ,
46
42
}
47
43
48
- func DeleteKeys () {
49
- // This function will:
50
- // 1. Get all keys
51
- // 2. Select half of the keys received
52
- // 3. Attempt to delete the keys selected
53
- // 4. Check that the keys were deleted
54
-
44
+ func main () {
55
45
ctx := context .Background ()
46
+ baseTime := time .Now ()
47
+ duration := time .Duration (robustnessrand .RandRange (5 , 60 ) * int64 (time .Second ))
48
+ testRobustness (ctx , baseTime , duration )
49
+ }
56
50
57
- // Connect to an etcd node
58
- cli := Connect ()
59
-
60
- // Get all keys
61
- resp , err := cli .Get (ctx , "" , clientv3 .WithPrefix ())
62
-
63
- // Antithesis Assertion: sometimes get with prefix requests are successful. A failed request is OK since we expect them to happen.
64
- assert .Sometimes (err == nil , "Client can make successful get all requests" , map [string ]any {"error" : err })
65
- cli .Close ()
66
-
67
- if err != nil {
68
- log .Printf ("Client failed to get all keys: %v" , err )
69
- os .Exit (0 )
70
- }
71
-
72
- // Choose half of the keys
73
- var keys []string
74
- for _ , k := range resp .Kvs {
75
- keys = append (keys , string (k .Key ))
76
- }
77
- half := len (keys ) / 2
78
- halfKeys := keys [:half ]
79
-
80
- // Connect to a new etcd node
81
- cli = Connect ()
51
+ func testRobustness (ctx context.Context , baseTime time.Time , duration time.Duration ) {
52
+ limiter := rate .NewLimiter (rate .Limit (profile .MaximalQPS ), profile .BurstableQPS )
53
+ finish := closeAfter (ctx , duration )
54
+ ids := identity .NewIDProvider ()
55
+ storage := identity .NewLeaseIDStorage ()
56
+ concurrencyLimiter := traffic .NewConcurrencyLimiter (profile .MaxNonUniqueRequestConcurrency )
57
+ hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
82
58
83
- // Delete half of the keys chosen
84
- var deletedKeys []string
85
- for _ , k := range halfKeys {
86
- _ , err := cli .Delete (ctx , k )
87
- // Antithesis Assertion: sometimes delete requests are successful. A failed request is OK since we expect them to happen.
88
- assert .Sometimes (err == nil , "Client can make successful delete requests" , map [string ]any {"error" : err })
89
- if err != nil {
90
- log .Printf ("Failed to delete key %s: %v" , k , err )
91
- } else {
92
- log .Printf ("Successfully deleted key %v" , k )
93
- deletedKeys = append (deletedKeys , k )
94
- }
59
+ reports := []report.ClientReport {}
60
+ var mux sync.Mutex
61
+ var wg sync.WaitGroup
62
+ for i := 0 ; i < profile .ClientCount ; i ++ {
63
+ c := connect (hosts [i % len (hosts )], ids , baseTime )
64
+ wg .Add (1 )
65
+ go func (c * client.RecordingClient ) {
66
+ defer wg .Done ()
67
+ defer c .Close ()
68
+
69
+ traffic .EtcdAntithesis .RunTrafficLoop (ctx , c , limiter ,
70
+ ids ,
71
+ storage ,
72
+ concurrencyLimiter ,
73
+ finish ,
74
+ )
75
+ mux .Lock ()
76
+ reports = append (reports , c .Report ())
77
+ mux .Unlock ()
78
+ }(c )
95
79
}
96
- cli . Close ()
97
-
98
- // Connect to a new etcd node
99
- cli = Connect ()
80
+ wg . Wait ()
81
+ fmt . Println ( "Completed robustness traffic generation" )
82
+ assert . Reachable ( "Completed robustness traffic generation" , nil )
83
+ }
100
84
101
- // Check to see if those keys were deleted / exist
102
- for _ , k := range deletedKeys {
103
- resp , err := cli .Get (ctx , k )
104
- // Antithesis Assertion: sometimes get requests are successful. A failed request is OK since we expect them to happen.
105
- assert .Sometimes (err == nil , "Client can make successful get requests" , map [string ]any {"error" : err })
106
- if err != nil {
107
- log .Printf ("Client failed to get key %s: %v" , k , err )
108
- continue
109
- }
110
- // Antithesis Assertion: if we deleted a key, we should not get a value
111
- assert .Always (resp .Count == 0 , "Key was deleted correctly" , map [string ]any {"key" : k })
85
+ func connect (endpoint string , ids identity.Provider , baseTime time.Time ) * client.RecordingClient {
86
+ cli , err := client .NewRecordingClient ([]string {endpoint }, ids , baseTime )
87
+ if err != nil {
88
+ // Antithesis Assertion: client should always be able to connect to an etcd host
89
+ assert .Unreachable ("Client failed to connect to an etcd host" , map [string ]any {"host" : endpoint , "error" : err })
90
+ os .Exit (1 )
112
91
}
113
- cli .Close ()
114
-
115
- assert .Reachable ("Completion of a key deleting check" , nil )
116
- log .Printf ("Completion of a key deleting check" )
92
+ return cli
117
93
}
118
94
119
- func main () {
120
- DeleteKeys ()
95
+ func closeAfter (ctx context.Context , t time.Duration ) <- chan struct {} {
96
+ out := make (chan struct {})
97
+ go func () {
98
+ for {
99
+ select {
100
+ case <- time .After (t ):
101
+ case <- ctx .Done ():
102
+ }
103
+ close (out )
104
+ }
105
+ }()
106
+ return out
121
107
}
0 commit comments