From ab6921bbce1af7b43808dc420ac3ae2ce1857a19 Mon Sep 17 00:00:00 2001 From: ah8ad3 Date: Tue, 2 Jul 2024 09:54:16 +0330 Subject: [PATCH 1/2] WIP: add member id to client watch reponses, add validateWatchSequential to check memberID in events Signed-off-by: ah8ad3 --- tests/robustness/client/client.go | 1 + tests/robustness/main_test.go | 1 + tests/robustness/model/watch.go | 1 + tests/robustness/watch.go | 24 ++++++++++++++++++++++++ 4 files changed, 27 insertions(+) diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index 516e5d5df8d..6ca9b76ec83 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -316,6 +316,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) model.WatchRe } resp.IsProgressNotify = r.IsProgressNotify() resp.Revision = r.Header.Revision + resp.MemberId = r.Header.MemberId err := r.Err() if err != nil { resp.Error = r.Err().Error() diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index 63ba6b37763..b792007dcb0 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -101,6 +101,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce watchProgressNotifyEnabled := c.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled) } + validateWatchSequential(t, r.Client) validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()} r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute) diff --git a/tests/robustness/model/watch.go b/tests/robustness/model/watch.go index fc880e30ede..5d8bd38b920 100644 --- a/tests/robustness/model/watch.go +++ b/tests/robustness/model/watch.go @@ -26,5 +26,6 @@ type WatchResponse struct { IsProgressNotify bool Revision int64 Time time.Duration + MemberId uint64 Error string } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 3da853f0078..1453b71f6c0 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -141,3 +141,27 @@ external: t.Errorf("Progress notify does not match, expect: %v, got: %v", expectProgressNotify, gotProgressNotify) } } + +func validateWatchSequential(t *testing.T, reports []report.ClientReport) { + for _, r := range reports { + for _, op := range r.Watch { + if op.Request.Revision != 0 { + continue + } + lastEventRevision := make(map[uint64]int64) + for _, resp := range op.Responses { + if len(resp.Events) == 0 { + continue + } + if _, ok := lastEventRevision[resp.MemberId]; !ok { + lastEventRevision[resp.MemberId] = 1 + } + firstEventRevision := resp.Events[0].Revision + if firstEventRevision < lastEventRevision[resp.MemberId] { + t.Errorf("Error watch sequential, expect: %v or higher, got: %v, member id: %v", lastEventRevision[resp.MemberId], firstEventRevision, resp.MemberId) + } + lastEventRevision[resp.MemberId] = resp.Events[len(resp.Events)-1].Revision + } + } + } +} From f7c6c33b19eb4dca60599e9a01e95ab833c57bb0 Mon Sep 17 00:00:00 2001 From: ah8ad3 Date: Wed, 3 Jul 2024 10:07:24 +0330 Subject: [PATCH 2/2] Combine watch responses for concurrent client requests and find last revision based one time of response Signed-off-by: ah8ad3 --- tests/robustness/watch.go | 46 +++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 1453b71f6c0..254d8116476 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "sort" "sync" "testing" "time" @@ -142,25 +143,56 @@ external: } } +type timeLastRevision struct { + time time.Duration + lastRevision int64 +} + +func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision { + result := make(map[uint64][]timeLastRevision) + for _, r := range reports { + for _, op := range r.Watch { + for _, resp := range op.Responses { + if len(resp.Events) == 0 { + continue + } + result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision}) + } + } + } + for memberId, structs := range result { + sort.Slice(structs, func(i, j int) bool { + return structs[i].time < structs[j].time + }) + result[memberId] = structs + } + return result +} + func validateWatchSequential(t *testing.T, reports []report.ClientReport) { + combinedWatchResponses := combineWatchResponses(reports) for _, r := range reports { for _, op := range r.Watch { if op.Request.Revision != 0 { continue } - lastEventRevision := make(map[uint64]int64) for _, resp := range op.Responses { if len(resp.Events) == 0 { continue } - if _, ok := lastEventRevision[resp.MemberId]; !ok { - lastEventRevision[resp.MemberId] = 1 + var lastMemberWatchRevision int64 + for i, c := range combinedWatchResponses[resp.MemberId] { + // Reports are sorted by time, find first greater or equal and use previous one. + if resp.Time >= c.time { + if i == 0 { + continue + } + lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision + } } - firstEventRevision := resp.Events[0].Revision - if firstEventRevision < lastEventRevision[resp.MemberId] { - t.Errorf("Error watch sequential, expect: %v or higher, got: %v, member id: %v", lastEventRevision[resp.MemberId], firstEventRevision, resp.MemberId) + if resp.Events[0].Revision < lastMemberWatchRevision { + t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId) } - lastEventRevision[resp.MemberId] = resp.Events[len(resp.Events)-1].Revision } } }