Asynchronous Inadmissible Workload Requeueing#9232
Asynchronous Inadmissible Workload Requeueing#9232gabesaba wants to merge 3 commits intokubernetes-sigs:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: gabesaba The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
@sohankunkerkar, can you drive this review please? |
✅ Deploy Preview for kubernetes-sigs-kueue canceled.
|
mwielgus
left a comment
There was a problem hiding this comment.
How often are the inadmissible workloads requeued after this change?
sohankunkerkar
left a comment
There was a problem hiding this comment.
Thanks for working on this @gabesaba. The approach of decoupling notification from processing via TypedController[requeueRequest] with AddAfter directly targets the QueueInadmissibleWorkloads churn identified in #8095.
As @mwielgus mentioned, it would really help if you could provide some numbers showing the performance improvement.
Note: On testing the batching behavior: Integration tests use 1ms batch period, which effectively makes them synchronous and doesn't exercise the actual batching/deduplication.
So client-go's delaying_queue implementation: deduplication only happens while the item is in the waiting heap. With a 1ms window, items mature to the active queue almost instantly, meaning a stream of events spaced >1ms apart will result in multiple reconciles instead of one. A unit test with a controlled clock or a longer batch period is necessary to prove this scenario.
| func (r *inadmissibleWorkloadRequeuer) setupWithManager(mgr ctrl.Manager) error { | ||
| return builder.TypedControllerManagedBy[requeueRequest](mgr). | ||
| Named("inadmissible_workload_requeue_controller"). | ||
| WatchesRawSource(source.TypedChannel(r.eventCh, &inadmissibleWorkloadRequeuer{})). |
There was a problem hiding this comment.
inadmissibleWorkloadRequeuer{} is a zero-value instance and its batchPeriod is 0. The Generic handler calls q.AddAfter(e.Object, r.batchPeriod), and client-go's AddAfter with duration <= 0 falls through to q.Add(item) immediately with no delay.
This means the 1s batch period defined here is never applied. Should this be r instead?
There was a problem hiding this comment.
Good catch, should definitely be r. I'll add an integration test to, to make sure we have this code under test.
| // q.AddAfter will process this so fast that it is not necessary. | ||
| // LLM review suggested this to derisk deadlock (during startup?), but I don't | ||
| // see this risk. | ||
| eventCh: make(chan event.TypedGenericEvent[requeueRequest]), |
There was a problem hiding this comment.
I looked at controller-runtime's source.TypedChannel internals where syncLoop starts a goroutine that continuously reads from the user channel and writes to an internal 1024-buffered dst channel. Since all callers (NotifyRetryInadmissible, notifyRetryInadmissibleWithoutLock, AddOrUpdateCohort) are reconciler goroutines that only run after mgr.Start() has started the source.
Even though unbuffered channel looks safe with current TypedChannel startup ordering. Still, for future DRA/WAS/TAS growth, should we prefer a small bounded buffer here as the default resilience pattern for internal event fan-in? We still keep dedup semantics in AddAfter with keyed requeueRequest.
There was a problem hiding this comment.
Thanks for the analysis, and this sgtm. How does 128 sound to you?
| } | ||
|
|
||
| // inadmissibleWorkloadRequeuer is responsible for receiving notifications, | ||
| // and requeuering workloads as a result of these notifications. |
There was a problem hiding this comment.
s/requeuering/requeuing/g
| reportMetrics(m, cqImpl.name) | ||
|
|
||
| if queued || addedWorkloads { | ||
| if addedWorkloads { |
There was a problem hiding this comment.
I assume this is the intentional tradeoff for batching?
This is fine if intentional, but worth explicitly documenting as throughput-over-latency tradeoff.
There was a problem hiding this comment.
Related to #9232 (comment). Broadcast wlil be a no-op until the workloads are moved. I add a broadcast call in requeueWorkloadsCQ and requeueWorkloadsCohort
| // or otherwise risk encountering an infinite loop if a Cohort | ||
| // cycle is introduced. | ||
| func requeueWorkloadsCohort(ctx context.Context, m *Manager, cohort *cohort) bool { | ||
| // RequeueCohort moves all inadmissibleWorkloads in |
There was a problem hiding this comment.
It looks like we can remove this information.
There was a problem hiding this comment.
Updated the comment, combining information from both parts
| batchPeriod time.Duration | ||
| } | ||
|
|
||
| func newInadmissibleWorkloadReconciler(qManager *Manager) *inadmissibleWorkloadRequeuer { |
There was a problem hiding this comment.
The constructor says "Reconciler", type says "Requeuer". Pick one. Since the type is inadmissibleWorkloadRequeuer, the constructor should be newInadmissibleWorkloadRequeuer.
|
|
||
| // NewManagerForIntegrationTests is a factory for Integration Tests, setting the | ||
| // batch period to a much lower value (requeueBatchPeriodIntegrationTests). | ||
| func NewManagerForIntegrationTests(client client.Client, checker StatusChecker, options ...Option) *Manager { |
There was a problem hiding this comment.
Do we really need the functions? I would argue against introducing such functions for testing.
It is a cleaner pattern to parametrize the value by passing the Options. Here you could expose a param like BatchInterval.
For example this is how controller-runtime exposes configuration for manager's RetryPeriod and other options.
There was a problem hiding this comment.
How about the options for NewManager, and then some factory that resides in the integration tests, that sets up integration tests with certain options (such as a smaller batching period)?
There was a problem hiding this comment.
+1, yes generic NewManager in the production code, and NewManagerForIntegrationTests for the test packages sgtm
| } | ||
|
|
||
| // NewManagerForUnitTestsWithRequeuer creates a new Manager for testing purposes, pre-configured with a testInadmissibleWorkloadRequeuer. | ||
| func NewManagerForUnitTestsWithRequeuer(client client.Client, checker StatusChecker, options ...Option) (*Manager, *testInadmissibleWorkloadRequeuer) { |
There was a problem hiding this comment.
For unit tests it is slightly more acceptable, but also I would argue against it. If we need to have a dedicated parametrization in unit tests we can use Options to the standard NewManager.
There was a problem hiding this comment.
For example, I'm wondering if we could pass clock when constructing the manager (might be optional parameter, real clock by default), and just move the clock by one second in unit tests. This aligns more with the current approach to time-based testing.
There was a problem hiding this comment.
For the new controller to work, it needs to be wired with ctrl.Manager, and these requeues will be processed in a different go-routine. I don't think this is the right choice for unit tests.
There was a problem hiding this comment.
For the new controller to work, it needs to be wired with ctrl.Manager, and these requeues will be processed in a different go-routine. I don't think this is the right choice for unit tests.
Hm, let me double check why you consider this is not the right choice? Actually, for unit tests it is great to be able to control the passage of time. Otherwise we risk flakes, this is why we tend to use fake clocks for most of our unit testing.
I remember this is what we do in the core k8s when using workqueues, just pass time and this triggers a goroutine inside the workqueue mechanics triggers the reconcile. So, I think what is needed is to call NewControllerManagedBy and customize the clock.
Let me know if there are some issues why this is not valid approach. It is not a blocker for me, just feels like more natural, safer (to avoid flakes), and consistent with the rest of codebase.
There was a problem hiding this comment.
This still requires setting up a controller-runtime ctrl.Manager in the unit tests, to call NewControllerManagedBy (or TypedControllerManagedBy). This seems to me deep in the territory of integration tests, especially since existing unit tests don't have a ctrl.Manager. Even if this were not required, we're relying on processing in go-routines and will require some syncronization to ensure the tests are not flaky.
I think that this lightweight test object makes more sense for unit tests, and covering the behavior of the async controller should be handled in integration tests (to be expanded as per #9232 (comment))
There was a problem hiding this comment.
I can see this is a matter of taste. Indeed, spawning a goroutine is what we do in unit tests for in the k8s contorllers, for example here. However, there it is a bit more lightweight as the basic workqueue is used, rather than the entire controller-runtime's manager.
I think that this lightweight test object makes more sense for unit tests, and covering the behavior of the async controller should be handled in integration tests (to be expanded as per #9232 (comment))
Yes, but the drawback is leaking test-only functions into prod code (at least in the initial implementation), maybe this is solvable by other means.
In any case, I'm ok with the pattern as is, it is not a blocker, just something I wanted to explore as we go into the territory of using controller-runtime reconcilers for internal code.
I think taking a pause of how to do it well is time worth spent. I'm still a bit hesitant - even if you wire this all up now, there will be a learning curve for how to wire the stuff in the future. I think it might be introducing a complexity barrier for new developers in the community. I basically though that reusing the controller-runtime existing machinery will make the unit tests easier to write, but I'm not stubborn here.
Let me know if you indeed considered the consequences and think this is better decision I'm ok with that.
| utiltestingapi.MakeWorkload("a", "moon").Queue("foo").Obj(), | ||
| ) | ||
| manager := NewManager(kClient, nil) | ||
| manager := NewManagerForUnitTests(kClient, nil) |
There was a problem hiding this comment.
I don't like how the diff is bloated by this change. Rather than impacting all tests, can we introduce the new batch period behind a dedicated feature gate, and only add a set of dedicated tests?
There was a problem hiding this comment.
I created a prep PR, to reduce the diff: #9224
In the feature flag case, are you suggesting maintaining both branches - before and after decoupling request requeueing from processing requeueing?
There was a problem hiding this comment.
In the feature flag case, are you suggesting maintaining both branches - before and after decoupling request requeueing from processing requeueing?
Yeah, good question. I think the amount of changes looks scary for the PR to be safe without a FG. If we can bring it down to reviewable state by prep PRs, maybe we can drop the FG.
If we go with the FG approach I imagine it goes to Beta directly (enabled by default), and we only have a small portion of tests testing the FG disabled. Then we graduate the FG after 2 releases. Basically, we keep FG=false just as a bailout output when something goes wrong.
| continue | ||
| } | ||
| processedRoots.Insert(rootName) | ||
| } |
There was a problem hiding this comment.
Is it a drive-by cleanup, or something necessary for the PR?
There was a problem hiding this comment.
Drive-by-cleanup. I (think) I can revert this, but later on this simplification makes sense, as we collapse keys in the requeuer and adds are cheap
There was a problem hiding this comment.
Ack. I will submit as a follow-up PR
| for _, clusterQueue := range cohort.ChildCQs() { | ||
| queued = queueInadmissibleWorkloads(ctx, clusterQueue, m.client) || queued | ||
| if queueInadmissibleWorkloads(ctx, clusterQueue, m.client) { | ||
| reportMetrics(m, clusterQueue.name) |
There was a problem hiding this comment.
It doesn't seem if we called reportMetrics here before, and so the change looks unrelated. Was it a bug before, or this is some non-obvious drive-by cleanup or something really needed here now?
There was a problem hiding this comment.
Previously we were calling requeue, and then reporting metrics if anything was moved. Now that we decouple notifying requeue from the actual requeueing, we need to update metrics in the latter step.
There was a problem hiding this comment.
sgtm, don't you mind just introducing reportMetrics in a prep?
There was a problem hiding this comment.
Renamed as reportPendingWorkloads in that PR
|
Let me propose to rephrase with the clear prefix, and clearly clarification what is the batch period, feel free to adjust, but I wanted to emphasize the direction in communicating to end user. |
|
LGTM overall, the main comments from me:
|
|
@gabesaba: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
PR needs rebase. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
| // we should inform clusterQueue controller to broadcast the event. | ||
| if cqNames := r.cache.AddOrUpdateResourceFlavor(log, e.Object.DeepCopy()); len(cqNames) > 0 { | ||
| qcache.QueueInadmissibleWorkloads(context.Background(), r.qManager, cqNames) | ||
| qcache.NotifyRetryInadmissible(r.qManager, cqNames) |
There was a problem hiding this comment.
Nice to see these uses of Background context eliminated 👍
What type of PR is this?
/kind bug
What this PR does / why we need it:
We decouple requesting inadmissible workloads to be reprocessed,
from the processing of these requests where we move the workloads.
This has a few main advantages
We accomplish this by defining the
inadmissibleWorkloadRequeuer,which implements the
requeueInadmissibleListenerinterface.Any requests to requeue inadmisisble workloads must go through this interface.
Then, the requeuer will process these requests in batches. The requeuer
deduplicates requests to the same ClusterQueue/Root Cohort, further reducing
duplicate reprocessing.
Which issue(s) this PR fixes:
Fixes #8095
Special notes for your reviewer:
Does this PR introduce a user-facing change?