-
Notifications
You must be signed in to change notification settings - Fork 10.1k
WIP: test if watch is sequential #18264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package robustness | |
|
||
import ( | ||
"context" | ||
"sort" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
@@ -141,3 +142,58 @@ external: | |
t.Errorf("Progress notify does not match, expect: %v, got: %v", expectProgressNotify, gotProgressNotify) | ||
} | ||
} | ||
|
||
type timeLastRevision struct { | ||
time time.Duration | ||
lastRevision int64 | ||
} | ||
|
||
func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision { | ||
result := make(map[uint64][]timeLastRevision) | ||
for _, r := range reports { | ||
for _, op := range r.Watch { | ||
for _, resp := range op.Responses { | ||
if len(resp.Events) == 0 { | ||
continue | ||
} | ||
result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision}) | ||
} | ||
} | ||
} | ||
for memberId, structs := range result { | ||
sort.Slice(structs, func(i, j int) bool { | ||
return structs[i].time < structs[j].time | ||
}) | ||
result[memberId] = structs | ||
} | ||
return result | ||
} | ||
|
||
func validateWatchSequential(t *testing.T, reports []report.ClientReport) { | ||
combinedWatchResponses := combineWatchResponses(reports) | ||
for _, r := range reports { | ||
ah8ad3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, op := range r.Watch { | ||
if op.Request.Revision != 0 { | ||
continue | ||
} | ||
for _, resp := range op.Responses { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if len(resp.Events) == 0 { if resp.Events[0].Revision < lastSeenRevision { |
||
if len(resp.Events) == 0 { | ||
continue | ||
} | ||
var lastMemberWatchRevision int64 | ||
for i, c := range combinedWatchResponses[resp.MemberId] { | ||
// Reports are sorted by time, find first greater or equal and use previous one. | ||
if resp.Time >= c.time { | ||
if i == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why skip the first one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for review, i'm using an logic here which is find the first equal or greater and use previous one. And there is nothing before 0. |
||
continue | ||
} | ||
lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision | ||
} | ||
} | ||
if resp.Events[0].Revision < lastMemberWatchRevision { | ||
t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId) | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to double check, but I think we could consider recording progress notify here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the responses are aggregated here we can do.