From 15850236b38cac8a836ca56dc46d38f59acf546a Mon Sep 17 00:00:00 2001 From: Nont Date: Wed, 30 Apr 2025 22:33:31 -0500 Subject: [PATCH 1/2] Use robustness traffic generation Signed-off-by: Nont --- .../test-template/robustness/main.go | 129 ++++++++---------- tests/robustness/traffic/etcd.go | 10 ++ 2 files changed, 69 insertions(+), 70 deletions(-) diff --git a/tests/antithesis/test-template/robustness/main.go b/tests/antithesis/test-template/robustness/main.go index f2895b0d1cc..3abdceed154 100644 --- a/tests/antithesis/test-template/robustness/main.go +++ b/tests/antithesis/test-template/robustness/main.go @@ -20,21 +20,38 @@ import ( "context" "log" "os" + "sync" "time" "github.com/antithesishq/antithesis-sdk-go/assert" "github.com/antithesishq/antithesis-sdk-go/random" + "golang.org/x/time/rate" - clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" + robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random" + "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func Connect() *client.RecordingClient { - // This function returns a client connection to an etcd node +var ( + // Please keep the sum of weights equal 100. + profile = traffic.Profile{ + MinimalQPS: 100, + MaximalQPS: 1000, + BurstableQPS: 1000, + ClientCount: 3, + MaxNonUniqueRequestConcurrency: 3, + } + IDProvider = identity.NewIDProvider() + LeaseIDStorage = identity.NewLeaseIDStorage() + ConcurrencyLimiter = traffic.NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) +) +// Connect returns a client connection to an etcd node +func Connect() *client.RecordingClient { hosts := []string{"etcd0:2379", "etcd1:2379", "etcd2:2379"} - cli, err := client.NewRecordingClient(hosts, identity.NewIDProvider(), time.Now()) + cli, err := client.NewRecordingClient(hosts, IDProvider, time.Now()) if err != nil { log.Fatalf("Failed to connect to etcd: %v", err) // Antithesis Assertion: client should always be able to connect to an etcd host @@ -45,77 +62,49 @@ func Connect() *client.RecordingClient { return cli } -func DeleteKeys() { - // This function will: - // 1. Get all keys - // 2. Select half of the keys received - // 3. Attempt to delete the keys selected - // 4. Check that the keys were deleted - +func testRobustness() { ctx := context.Background() - - // Connect to an etcd node - cli := Connect() - - // Get all keys - resp, err := cli.Get(ctx, "", clientv3.WithPrefix()) - - // Antithesis Assertion: sometimes get with prefix requests are successful. A failed request is OK since we expect them to happen. - assert.Sometimes(err == nil, "Client can make successful get all requests", map[string]any{"error": err}) - cli.Close() - - if err != nil { - log.Printf("Client failed to get all keys: %v", err) - os.Exit(0) + var wg sync.WaitGroup + var mux sync.Mutex + runfor := time.Duration(robustnessrand.RandRange(5, 60) * int64(time.Second)) + limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), profile.BurstableQPS) + finish := wrap(time.After(runfor)) + reports := []report.ClientReport{} + + for range profile.ClientCount { + wg.Add(1) + c := Connect() + go func(c *client.RecordingClient) { + defer wg.Done() + defer c.Close() + + traffic.EtcdAntithesis.RunTrafficLoop(ctx, c, limiter, + IDProvider, + LeaseIDStorage, + ConcurrencyLimiter, + finish, + ) + mux.Lock() + reports = append(reports, c.Report()) + mux.Unlock() + }(c) } + wg.Wait() + assert.Reachable("Completion robustness traffic generation", nil) +} - // Choose half of the keys - var keys []string - for _, k := range resp.Kvs { - keys = append(keys, string(k.Key)) - } - half := len(keys) / 2 - halfKeys := keys[:half] - - // Connect to a new etcd node - cli = Connect() - - // Delete half of the keys chosen - var deletedKeys []string - for _, k := range halfKeys { - _, err := cli.Delete(ctx, k) - // Antithesis Assertion: sometimes delete requests are successful. A failed request is OK since we expect them to happen. - assert.Sometimes(err == nil, "Client can make successful delete requests", map[string]any{"error": err}) - if err != nil { - log.Printf("Failed to delete key %s: %v", k, err) - } else { - log.Printf("Successfully deleted key %v", k) - deletedKeys = append(deletedKeys, k) - } - } - cli.Close() - - // Connect to a new etcd node - cli = Connect() - - // Check to see if those keys were deleted / exist - for _, k := range deletedKeys { - resp, err := cli.Get(ctx, k) - // Antithesis Assertion: sometimes get requests are successful. A failed request is OK since we expect them to happen. - assert.Sometimes(err == nil, "Client can make successful get requests", map[string]any{"error": err}) - if err != nil { - log.Printf("Client failed to get key %s: %v", k, err) - continue +// wrap converts a receive-only channel to receive-only struct{} channel +func wrap[T any](from <-chan T) <-chan struct{} { + out := make(chan struct{}) + go func() { + for { + <-from + out <- struct{}{} } - // Antithesis Assertion: if we deleted a key, we should not get a value - assert.Always(resp.Count == 0, "Key was deleted correctly", map[string]any{"key": k}) - } - cli.Close() - - assert.Reachable("Completion of a key deleting check", nil) - log.Printf("Completion of a key deleting check") + }() + return out } func main() { - DeleteKeys() + testRobustness() } diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 906f3d2cb46..5e631f7ff9f 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -75,6 +75,16 @@ var ( {Choice: Delete, Weight: 50}, }, } + EtcdAntithesis Traffic = etcdTraffic{ + keyCount: 10, + largePutSize: 32769, + leaseTTL: DefaultLeaseTTL, + // Please keep the sum of weights equal 100. + requests: []random.ChoiceWeight[etcdRequestType]{ + {Choice: Get, Weight: 50}, + {Choice: Put, Weight: 50}, + }, + } ) type etcdTraffic struct { From 3860239a9b430b48ed792cb57ec948b03f8f95b7 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 5 May 2025 10:38:36 +0200 Subject: [PATCH 2/2] Refactor antithesis robustness main Signed-off-by: Marek Siarkowicz --- .../test-template/robustness/main.go | 93 +++++++++---------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/tests/antithesis/test-template/robustness/main.go b/tests/antithesis/test-template/robustness/main.go index 3abdceed154..49be36dfed2 100644 --- a/tests/antithesis/test-template/robustness/main.go +++ b/tests/antithesis/test-template/robustness/main.go @@ -18,13 +18,12 @@ package main import ( "context" - "log" + "fmt" "os" "sync" "time" "github.com/antithesishq/antithesis-sdk-go/assert" - "github.com/antithesishq/antithesis-sdk-go/random" "golang.org/x/time/rate" "go.etcd.io/etcd/tests/v3/robustness/client" @@ -34,54 +33,43 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -var ( - // Please keep the sum of weights equal 100. - profile = traffic.Profile{ - MinimalQPS: 100, - MaximalQPS: 1000, - BurstableQPS: 1000, - ClientCount: 3, - MaxNonUniqueRequestConcurrency: 3, - } - IDProvider = identity.NewIDProvider() - LeaseIDStorage = identity.NewLeaseIDStorage() - ConcurrencyLimiter = traffic.NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) -) - -// Connect returns a client connection to an etcd node -func Connect() *client.RecordingClient { - hosts := []string{"etcd0:2379", "etcd1:2379", "etcd2:2379"} - cli, err := client.NewRecordingClient(hosts, IDProvider, time.Now()) - if err != nil { - log.Fatalf("Failed to connect to etcd: %v", err) - // Antithesis Assertion: client should always be able to connect to an etcd host - host := random.RandomChoice(hosts) - assert.Unreachable("Client failed to connect to an etcd host", map[string]any{"host": host, "error": err}) - os.Exit(1) - } - return cli +var profile = traffic.Profile{ + MinimalQPS: 100, + MaximalQPS: 1000, + BurstableQPS: 1000, + ClientCount: 3, + MaxNonUniqueRequestConcurrency: 3, } -func testRobustness() { +func main() { ctx := context.Background() - var wg sync.WaitGroup - var mux sync.Mutex - runfor := time.Duration(robustnessrand.RandRange(5, 60) * int64(time.Second)) + baseTime := time.Now() + duration := time.Duration(robustnessrand.RandRange(5, 60) * int64(time.Second)) + testRobustness(ctx, baseTime, duration) +} + +func testRobustness(ctx context.Context, baseTime time.Time, duration time.Duration) { limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), profile.BurstableQPS) - finish := wrap(time.After(runfor)) - reports := []report.ClientReport{} + finish := closeAfter(ctx, duration) + ids := identity.NewIDProvider() + storage := identity.NewLeaseIDStorage() + concurrencyLimiter := traffic.NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) + hosts := []string{"etcd0:2379", "etcd1:2379", "etcd2:2379"} - for range profile.ClientCount { + reports := []report.ClientReport{} + var mux sync.Mutex + var wg sync.WaitGroup + for i := 0; i < profile.ClientCount; i++ { + c := connect(hosts[i%len(hosts)], ids, baseTime) wg.Add(1) - c := Connect() go func(c *client.RecordingClient) { defer wg.Done() defer c.Close() traffic.EtcdAntithesis.RunTrafficLoop(ctx, c, limiter, - IDProvider, - LeaseIDStorage, - ConcurrencyLimiter, + ids, + storage, + concurrencyLimiter, finish, ) mux.Lock() @@ -90,21 +78,30 @@ func testRobustness() { }(c) } wg.Wait() - assert.Reachable("Completion robustness traffic generation", nil) + fmt.Println("Completed robustness traffic generation") + assert.Reachable("Completed robustness traffic generation", nil) +} + +func connect(endpoint string, ids identity.Provider, baseTime time.Time) *client.RecordingClient { + cli, err := client.NewRecordingClient([]string{endpoint}, ids, baseTime) + if err != nil { + // Antithesis Assertion: client should always be able to connect to an etcd host + assert.Unreachable("Client failed to connect to an etcd host", map[string]any{"host": endpoint, "error": err}) + os.Exit(1) + } + return cli } -// wrap converts a receive-only channel to receive-only struct{} channel -func wrap[T any](from <-chan T) <-chan struct{} { +func closeAfter(ctx context.Context, t time.Duration) <-chan struct{} { out := make(chan struct{}) go func() { for { - <-from - out <- struct{}{} + select { + case <-time.After(t): + case <-ctx.Done(): + } + close(out) } }() return out } - -func main() { - testRobustness() -}