@@ -18,13 +18,12 @@ package main
18
18
19
19
import (
20
20
"context"
21
- "log "
21
+ "fmt "
22
22
"os"
23
23
"sync"
24
24
"time"
25
25
26
26
"github.com/antithesishq/antithesis-sdk-go/assert"
27
- "github.com/antithesishq/antithesis-sdk-go/random"
28
27
"golang.org/x/time/rate"
29
28
30
29
"go.etcd.io/etcd/tests/v3/robustness/client"
@@ -34,54 +33,43 @@ import (
34
33
"go.etcd.io/etcd/tests/v3/robustness/traffic"
35
34
)
36
35
37
- var (
38
- // Please keep the sum of weights equal 100.
39
- profile = traffic.Profile {
40
- MinimalQPS : 100 ,
41
- MaximalQPS : 1000 ,
42
- BurstableQPS : 1000 ,
43
- ClientCount : 3 ,
44
- MaxNonUniqueRequestConcurrency : 3 ,
45
- }
46
- IDProvider = identity .NewIDProvider ()
47
- LeaseIDStorage = identity .NewLeaseIDStorage ()
48
- ConcurrencyLimiter = traffic .NewConcurrencyLimiter (profile .MaxNonUniqueRequestConcurrency )
49
- )
50
-
51
- // Connect returns a client connection to an etcd node
52
- func Connect () * client.RecordingClient {
53
- hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
54
- cli , err := client .NewRecordingClient (hosts , IDProvider , time .Now ())
55
- if err != nil {
56
- log .Fatalf ("Failed to connect to etcd: %v" , err )
57
- // Antithesis Assertion: client should always be able to connect to an etcd host
58
- host := random .RandomChoice (hosts )
59
- assert .Unreachable ("Client failed to connect to an etcd host" , map [string ]any {"host" : host , "error" : err })
60
- os .Exit (1 )
61
- }
62
- return cli
36
+ var profile = traffic.Profile {
37
+ MinimalQPS : 100 ,
38
+ MaximalQPS : 1000 ,
39
+ BurstableQPS : 1000 ,
40
+ ClientCount : 3 ,
41
+ MaxNonUniqueRequestConcurrency : 3 ,
63
42
}
64
43
65
- func testRobustness () {
44
+ func main () {
66
45
ctx := context .Background ()
67
- var wg sync.WaitGroup
68
- var mux sync.Mutex
69
- runfor := time .Duration (robustnessrand .RandRange (5 , 60 ) * int64 (time .Second ))
46
+ baseTime := time .Now ()
47
+ duration := time .Duration (robustnessrand .RandRange (5 , 60 ) * int64 (time .Second ))
48
+ testRobustness (ctx , baseTime , duration )
49
+ }
50
+
51
+ func testRobustness (ctx context.Context , baseTime time.Time , duration time.Duration ) {
70
52
limiter := rate .NewLimiter (rate .Limit (profile .MaximalQPS ), profile .BurstableQPS )
71
- finish := wrap (time .After (runfor ))
72
- reports := []report.ClientReport {}
53
+ finish := closeAfter (ctx , duration )
54
+ ids := identity .NewIDProvider ()
55
+ storage := identity .NewLeaseIDStorage ()
56
+ concurrencyLimiter := traffic .NewConcurrencyLimiter (profile .MaxNonUniqueRequestConcurrency )
57
+ hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
73
58
74
- for range profile .ClientCount {
59
+ reports := []report.ClientReport {}
60
+ var mux sync.Mutex
61
+ var wg sync.WaitGroup
62
+ for i := 0 ; i < profile .ClientCount ; i ++ {
63
+ c := connect (hosts [i % len (hosts )], ids , baseTime )
75
64
wg .Add (1 )
76
- c := Connect ()
77
65
go func (c * client.RecordingClient ) {
78
66
defer wg .Done ()
79
67
defer c .Close ()
80
68
81
69
traffic .EtcdAntithesis .RunTrafficLoop (ctx , c , limiter ,
82
- IDProvider ,
83
- LeaseIDStorage ,
84
- ConcurrencyLimiter ,
70
+ ids ,
71
+ storage ,
72
+ concurrencyLimiter ,
85
73
finish ,
86
74
)
87
75
mux .Lock ()
@@ -90,21 +78,30 @@ func testRobustness() {
90
78
}(c )
91
79
}
92
80
wg .Wait ()
93
- assert .Reachable ("Completion robustness traffic generation" , nil )
81
+ fmt .Println ("Completed robustness traffic generation" )
82
+ assert .Reachable ("Completed robustness traffic generation" , nil )
83
+ }
84
+
85
+ func connect (endpoint string , ids identity.Provider , baseTime time.Time ) * client.RecordingClient {
86
+ cli , err := client .NewRecordingClient ([]string {endpoint }, ids , baseTime )
87
+ if err != nil {
88
+ // Antithesis Assertion: client should always be able to connect to an etcd host
89
+ assert .Unreachable ("Client failed to connect to an etcd host" , map [string ]any {"host" : endpoint , "error" : err })
90
+ os .Exit (1 )
91
+ }
92
+ return cli
94
93
}
95
94
96
- // wrap converts a receive-only channel to receive-only struct{} channel
97
- func wrap [T any ](from <- chan T ) <- chan struct {} {
95
+ func closeAfter (ctx context.Context , t time.Duration ) <- chan struct {} {
98
96
out := make (chan struct {})
99
97
go func () {
100
98
for {
101
- <- from
102
- out <- struct {}{}
99
+ select {
100
+ case <- time .After (t ):
101
+ case <- ctx .Done ():
102
+ }
103
+ close (out )
103
104
}
104
105
}()
105
106
return out
106
107
}
107
-
108
- func main () {
109
- testRobustness ()
110
- }
0 commit comments