-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🌱 inmemory: fix watch to continue serving based on resourceVersion parameter #11695
base: main
Are you sure you want to change the base?
🌱 inmemory: fix watch to continue serving based on resourceVersion parameter #11695
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
54e7e43
to
1b644a2
Compare
Kudos to @sbueringer for helping analysing and brainstorming to find the root cause. |
/test help |
@chrischdi: The specified target(s) for
The following commands are available to trigger optional jobs:
Use
In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/test pull-cluster-api-e2e-main |
1b644a2
to
3bac618
Compare
/test pull-cluster-api-e2e-main |
@@ -139,6 +198,16 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R | |||
} | |||
w.Header().Set("Transfer-Encoding", "chunked") | |||
w.WriteHeader(http.StatusOK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it okay that we don't flush after this like before?
// Write all initial events. | ||
for _, event := range initialEvents { | ||
if err := resp.WriteEntity(event); err != nil { | ||
log.Error(err, "Writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Error(err, "Writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) | |
log.Error(err, "Error writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using "initial event" instead of "old event"?
if err := resp.WriteEntity(event); err != nil { | ||
log.Error(err, "Writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Error(err, "Writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) | |
log.Error(err, "Error writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job in investigating and fixing this issue, kudos!
@@ -145,6 +145,9 @@ func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...clien | |||
if err := meta.SetList(list, items); err != nil { | |||
return apierrors.NewInternalError(err) | |||
} | |||
|
|||
list.SetResourceVersion(fmt.Sprintf("%d", tracker.lastResourceVersion)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: is it correct that list ResourceVersion in the system and not the last resource version in the list of items?
Q: is is correct set set this value non matter of it is a "plain" list or a lis watch?
(from a quick check with kubectl yes to both, but I like a confirmation)
} | ||
} | ||
|
||
func (h *apiServerHandler) apiV1list(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be
func (h *apiServerHandler) apiV1list(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) { | |
func (h *apiServerHandler) v1List(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) { |
(easier to distinguish from apiV1List with upper L)
// Write all initial events. | ||
for _, event := range initialEvents { | ||
if err := resp.WriteEntity(event); err != nil { | ||
log.Error(err, "Writing old event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using "initial event" instead of "old event"?
// Determine the highest written resourceVersion so we can filter out duplicated events from the channel. | ||
minResourceVersion := uint64(0) | ||
if len(initialEvents) > 0 { | ||
minResourceVersion, err = strconv.ParseUint(initialEvents[len(initialEvents)-1].Object.GetResourceVersion(), 10, 64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of assuming an ordering of events by resource version (which I'm not sure we are enforcing somewhere), what about computing minResourceVersion when we go through initialEvents in the for loop above
What this PR does / why we need it:
This PR has the following changes:
resourceVersion
parameter on watches (if given) to enqueue events for all objects which have a higher resourceVersionWhich issue(s) this PR fixes (optional, in
fixes #<issue number>(, fixes #<issue_number>, ...)
format, will close the issue(s) when PR gets merged):Fixes #
/area provider/infrastructure-in-memory