@@ -18,13 +18,12 @@ package main
18
18
19
19
import (
20
20
"context"
21
- "log "
21
+ "fmt "
22
22
"os"
23
23
"sync"
24
24
"time"
25
25
26
26
"github.com/antithesishq/antithesis-sdk-go/assert"
27
- "github.com/antithesishq/antithesis-sdk-go/random"
28
27
"golang.org/x/time/rate"
29
28
30
29
"go.etcd.io/etcd/tests/v3/robustness/client"
@@ -43,45 +42,37 @@ var (
43
42
ClientCount : 3 ,
44
43
MaxNonUniqueRequestConcurrency : 3 ,
45
44
}
46
- IDProvider = identity .NewIDProvider ()
47
- LeaseIDStorage = identity .NewLeaseIDStorage ()
48
- ConcurrencyLimiter = traffic .NewConcurrencyLimiter (profile .MaxNonUniqueRequestConcurrency )
49
45
)
50
46
51
- // Connect returns a client connection to an etcd node
52
- func Connect () * client.RecordingClient {
53
- hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
54
- cli , err := client .NewRecordingClient (hosts , IDProvider , time .Now ())
55
- if err != nil {
56
- log .Fatalf ("Failed to connect to etcd: %v" , err )
57
- // Antithesis Assertion: client should always be able to connect to an etcd host
58
- host := random .RandomChoice (hosts )
59
- assert .Unreachable ("Client failed to connect to an etcd host" , map [string ]any {"host" : host , "error" : err })
60
- os .Exit (1 )
61
- }
62
- return cli
47
+ func main () {
48
+ ctx := context .Background ()
49
+ baseTime := time .Now ()
50
+ duration := time .Duration (robustnessrand .RandRange (5 , 60 ) * int64 (time .Second ))
51
+ testRobustness (ctx , baseTime , duration )
63
52
}
64
53
65
- func testRobustness () {
66
- ctx := context .Background ()
67
- var wg sync.WaitGroup
68
- var mux sync.Mutex
69
- runfor := time .Duration (robustnessrand .RandRange (5 , 60 ) * int64 (time .Second ))
54
+ func testRobustness (ctx context.Context , baseTime time.Time , duration time.Duration ) {
70
55
limiter := rate .NewLimiter (rate .Limit (profile .MaximalQPS ), profile .BurstableQPS )
71
- finish := wrap (time .After (runfor ))
72
- reports := []report.ClientReport {}
56
+ finish := closeAfter (ctx , duration )
57
+ ids := identity .NewIDProvider ()
58
+ storage := identity .NewLeaseIDStorage ()
59
+ concurrencyLimiter := traffic .NewConcurrencyLimiter (profile .MaxNonUniqueRequestConcurrency )
60
+ hosts := []string {"etcd0:2379" , "etcd1:2379" , "etcd2:2379" }
73
61
74
- for range profile .ClientCount {
62
+ reports := []report.ClientReport {}
63
+ var mux sync.Mutex
64
+ var wg sync.WaitGroup
65
+ for i := 0 ; i < profile .ClientCount ; i ++ {
66
+ c := connect (hosts [i % len (hosts )], ids , baseTime )
75
67
wg .Add (1 )
76
- c := Connect ()
77
68
go func (c * client.RecordingClient ) {
78
69
defer wg .Done ()
79
70
defer c .Close ()
80
71
81
72
traffic .EtcdAntithesis .RunTrafficLoop (ctx , c , limiter ,
82
- IDProvider ,
83
- LeaseIDStorage ,
84
- ConcurrencyLimiter ,
73
+ ids ,
74
+ storage ,
75
+ concurrencyLimiter ,
85
76
finish ,
86
77
)
87
78
mux .Lock ()
@@ -90,21 +81,30 @@ func testRobustness() {
90
81
}(c )
91
82
}
92
83
wg .Wait ()
93
- assert .Reachable ("Completion robustness traffic generation" , nil )
84
+ fmt .Println ("Completed robustness traffic generation" )
85
+ assert .Reachable ("Completed robustness traffic generation" , nil )
94
86
}
95
87
96
- // wrap converts a receive-only channel to receive-only struct{} channel
97
- func wrap [T any ](from <- chan T ) <- chan struct {} {
88
+ func connect (endpoint string , ids identity.Provider , baseTime time.Time ) * client.RecordingClient {
89
+ cli , err := client .NewRecordingClient ([]string {endpoint }, ids , baseTime )
90
+ if err != nil {
91
+ // Antithesis Assertion: client should always be able to connect to an etcd host
92
+ assert .Unreachable ("Client failed to connect to an etcd host" , map [string ]any {"host" : endpoint , "error" : err })
93
+ os .Exit (1 )
94
+ }
95
+ return cli
96
+ }
97
+
98
+ func closeAfter (ctx context.Context , t time.Duration ) <- chan struct {} {
98
99
out := make (chan struct {})
99
100
go func () {
100
101
for {
101
- <- from
102
- out <- struct {}{}
102
+ select {
103
+ case <- time .After (t ):
104
+ case <- ctx .Done ():
105
+ }
106
+ close (out )
103
107
}
104
108
}()
105
109
return out
106
110
}
107
-
108
- func main () {
109
- testRobustness ()
110
- }
0 commit comments