diff --git a/tests/antithesis/test-template/robustness/main.go b/tests/antithesis/test-template/robustness/main.go index f2895b0d1cc..49be36dfed2 100644 --- a/tests/antithesis/test-template/robustness/main.go +++ b/tests/antithesis/test-template/robustness/main.go @@ -18,104 +18,90 @@ package main import ( "context" - "log" + "fmt" "os" + "sync" "time" "github.com/antithesishq/antithesis-sdk-go/assert" - "github.com/antithesishq/antithesis-sdk-go/random" + "golang.org/x/time/rate" - clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" + robustnessrand "go.etcd.io/etcd/tests/v3/robustness/random" + "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func Connect() *client.RecordingClient { - // This function returns a client connection to an etcd node - - hosts := []string{"etcd0:2379", "etcd1:2379", "etcd2:2379"} - cli, err := client.NewRecordingClient(hosts, identity.NewIDProvider(), time.Now()) - if err != nil { - log.Fatalf("Failed to connect to etcd: %v", err) - // Antithesis Assertion: client should always be able to connect to an etcd host - host := random.RandomChoice(hosts) - assert.Unreachable("Client failed to connect to an etcd host", map[string]any{"host": host, "error": err}) - os.Exit(1) - } - return cli +var profile = traffic.Profile{ + MinimalQPS: 100, + MaximalQPS: 1000, + BurstableQPS: 1000, + ClientCount: 3, + MaxNonUniqueRequestConcurrency: 3, } -func DeleteKeys() { - // This function will: - // 1. Get all keys - // 2. Select half of the keys received - // 3. Attempt to delete the keys selected - // 4. Check that the keys were deleted - +func main() { ctx := context.Background() + baseTime := time.Now() + duration := time.Duration(robustnessrand.RandRange(5, 60) * int64(time.Second)) + testRobustness(ctx, baseTime, duration) +} - // Connect to an etcd node - cli := Connect() - - // Get all keys - resp, err := cli.Get(ctx, "", clientv3.WithPrefix()) - - // Antithesis Assertion: sometimes get with prefix requests are successful. A failed request is OK since we expect them to happen. - assert.Sometimes(err == nil, "Client can make successful get all requests", map[string]any{"error": err}) - cli.Close() - - if err != nil { - log.Printf("Client failed to get all keys: %v", err) - os.Exit(0) - } - - // Choose half of the keys - var keys []string - for _, k := range resp.Kvs { - keys = append(keys, string(k.Key)) - } - half := len(keys) / 2 - halfKeys := keys[:half] - - // Connect to a new etcd node - cli = Connect() +func testRobustness(ctx context.Context, baseTime time.Time, duration time.Duration) { + limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), profile.BurstableQPS) + finish := closeAfter(ctx, duration) + ids := identity.NewIDProvider() + storage := identity.NewLeaseIDStorage() + concurrencyLimiter := traffic.NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) + hosts := []string{"etcd0:2379", "etcd1:2379", "etcd2:2379"} - // Delete half of the keys chosen - var deletedKeys []string - for _, k := range halfKeys { - _, err := cli.Delete(ctx, k) - // Antithesis Assertion: sometimes delete requests are successful. A failed request is OK since we expect them to happen. - assert.Sometimes(err == nil, "Client can make successful delete requests", map[string]any{"error": err}) - if err != nil { - log.Printf("Failed to delete key %s: %v", k, err) - } else { - log.Printf("Successfully deleted key %v", k) - deletedKeys = append(deletedKeys, k) - } + reports := []report.ClientReport{} + var mux sync.Mutex + var wg sync.WaitGroup + for i := 0; i < profile.ClientCount; i++ { + c := connect(hosts[i%len(hosts)], ids, baseTime) + wg.Add(1) + go func(c *client.RecordingClient) { + defer wg.Done() + defer c.Close() + + traffic.EtcdAntithesis.RunTrafficLoop(ctx, c, limiter, + ids, + storage, + concurrencyLimiter, + finish, + ) + mux.Lock() + reports = append(reports, c.Report()) + mux.Unlock() + }(c) } - cli.Close() - - // Connect to a new etcd node - cli = Connect() + wg.Wait() + fmt.Println("Completed robustness traffic generation") + assert.Reachable("Completed robustness traffic generation", nil) +} - // Check to see if those keys were deleted / exist - for _, k := range deletedKeys { - resp, err := cli.Get(ctx, k) - // Antithesis Assertion: sometimes get requests are successful. A failed request is OK since we expect them to happen. - assert.Sometimes(err == nil, "Client can make successful get requests", map[string]any{"error": err}) - if err != nil { - log.Printf("Client failed to get key %s: %v", k, err) - continue - } - // Antithesis Assertion: if we deleted a key, we should not get a value - assert.Always(resp.Count == 0, "Key was deleted correctly", map[string]any{"key": k}) +func connect(endpoint string, ids identity.Provider, baseTime time.Time) *client.RecordingClient { + cli, err := client.NewRecordingClient([]string{endpoint}, ids, baseTime) + if err != nil { + // Antithesis Assertion: client should always be able to connect to an etcd host + assert.Unreachable("Client failed to connect to an etcd host", map[string]any{"host": endpoint, "error": err}) + os.Exit(1) } - cli.Close() - - assert.Reachable("Completion of a key deleting check", nil) - log.Printf("Completion of a key deleting check") + return cli } -func main() { - DeleteKeys() +func closeAfter(ctx context.Context, t time.Duration) <-chan struct{} { + out := make(chan struct{}) + go func() { + for { + select { + case <-time.After(t): + case <-ctx.Done(): + } + close(out) + } + }() + return out } diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 906f3d2cb46..5e631f7ff9f 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -75,6 +75,16 @@ var ( {Choice: Delete, Weight: 50}, }, } + EtcdAntithesis Traffic = etcdTraffic{ + keyCount: 10, + largePutSize: 32769, + leaseTTL: DefaultLeaseTTL, + // Please keep the sum of weights equal 100. + requests: []random.ChoiceWeight[etcdRequestType]{ + {Choice: Get, Weight: 50}, + {Choice: Put, Weight: 50}, + }, + } ) type etcdTraffic struct {