Skip to content

Commit 295dc31

Browse files
committed
Debug
1 parent 62a64bc commit 295dc31

File tree

7 files changed

+86
-12
lines changed

7 files changed

+86
-12
lines changed

.github/workflows/benchmark.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ jobs:
3636
with:
3737
go-version: ${{ env.GO_VERSION }}
3838

39-
- name: Test Benchmark Hack
39+
- name: Test Benchmark
4040
shell: bash
4141
run: |
42-
./hack/e2e-test.sh e2e/kwokctl/benchmark-hack
42+
./hack/e2e-test.sh e2e/kwokctl/benchmark
4343
44-
- name: Test Benchmark
44+
- name: Test Benchmark Hack
4545
shell: bash
4646
run: |
47-
./hack/e2e-test.sh e2e/kwokctl/benchmark
47+
./hack/e2e-test.sh e2e/kwokctl/benchmark-hack
4848
4949
- name: Upload logs
5050
uses: actions/upload-artifact@v4

pkg/kwok/controllers/controller.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ type Controller struct {
9595

9696
podOnNodeManageQueue queue.Queue[string]
9797
nodeManageQueue queue.Queue[string]
98+
99+
podOnNodeManageQueueParallelism *queue.AdaptiveQueue[string]
100+
nodeManageQueueParallelism *queue.AdaptiveQueue[string]
98101
}
99102

100103
// Config is the configuration for the controller
@@ -295,6 +298,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
295298
c.nodeLeases.ReleaseHold(nodeName)
296299
}
297300

301+
c.nodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.nodeManageQueue, c.nodeLeaseSyncWorker)
298302
go c.nodeLeaseSyncWorker(ctx)
299303

300304
err = c.nodeLeases.Start(ctx)
@@ -307,7 +311,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
307311
func (c *Controller) nodeLeaseSyncWorker(ctx context.Context) {
308312
logger := log.FromContext(ctx)
309313
for ctx.Err() == nil {
310-
nodeName, ok := c.nodeManageQueue.GetOrWaitWithDone(ctx.Done())
314+
nodeName, ok := c.nodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
311315
if !ok {
312316
return
313317
}
@@ -339,6 +343,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi
339343
return fmt.Errorf("failed to init pod controller: %w", err)
340344
}
341345

346+
c.podOnNodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.podOnNodeManageQueue, c.podsOnNodeSyncWorker)
342347
go c.podsOnNodeSyncWorker(ctx)
343348

344349
case nodeRef:
@@ -559,7 +564,7 @@ func (c *Controller) Start(ctx context.Context) error {
559564
func (c *Controller) podsOnNodeSyncWorker(ctx context.Context) {
560565
logger := log.FromContext(ctx)
561566
for ctx.Err() == nil {
562-
nodeName, ok := c.podOnNodeManageQueue.GetOrWaitWithDone(ctx.Done())
567+
nodeName, ok := c.podOnNodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
563568
if !ok {
564569
return
565570
}

pkg/kwok/controllers/node_controller.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type NodeController struct {
6060
lifecycle resources.Getter[lifecycle.Lifecycle]
6161
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Node]]
6262
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Node]]
63+
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Node]]
6364
backoff wait.Backoff
6465
recorder record.EventRecorder
6566
readOnlyFunc func(nodeName string) bool
@@ -143,6 +144,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
143144
// if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed
144145
func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error {
145146
go c.preprocessWorker(ctx)
147+
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
146148
for i := uint(0); i < c.playStageParallelism; i++ {
147149
go c.playStageWorker(ctx)
148150
}
@@ -323,7 +325,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) {
323325
logger := log.FromContext(ctx)
324326

325327
for ctx.Err() == nil {
326-
node, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
328+
node, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
327329
if !ok {
328330
return
329331
}

pkg/kwok/controllers/node_lease_controller.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ type NodeLeaseController struct {
4949
// mutateLeaseFunc allows customizing a lease object
5050
mutateLeaseFunc func(*coordinationv1.Lease) error
5151

52-
delayQueue queue.WeightDelayingQueue[string]
53-
holdLeaseSet maps.SyncMap[string, bool]
52+
delayQueue queue.WeightDelayingQueue[string]
53+
holdLeaseSet maps.SyncMap[string, bool]
54+
delayQueueParallelism *queue.AdaptiveQueue[string]
5455

5556
holderIdentity string
5657
onNodeManagedFunc func(nodeName string)
@@ -99,6 +100,7 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle
99100

100101
// Start starts the NodeLeaseController
101102
func (c *NodeLeaseController) Start(ctx context.Context) error {
103+
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.syncWorker)
102104
for i := uint(0); i < c.leaseParallelism; i++ {
103105
go c.syncWorker(ctx)
104106
}
@@ -108,7 +110,7 @@ func (c *NodeLeaseController) Start(ctx context.Context) error {
108110
func (c *NodeLeaseController) syncWorker(ctx context.Context) {
109111
logger := log.FromContext(ctx)
110112
for ctx.Err() == nil {
111-
nodeName, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
113+
nodeName, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
112114
if !ok {
113115
return
114116
}

pkg/kwok/controllers/pod_controller.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type PodController struct {
6464
playStageParallelism uint
6565
lifecycle resources.Getter[lifecycle.Lifecycle]
6666
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Pod]]
67+
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Pod]]
6768
backoff wait.Backoff
6869
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Pod]]
6970
recorder record.EventRecorder
@@ -148,6 +149,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
148149
// It will modify the pods status to we want
149150
func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error {
150151
go c.preprocessWorker(ctx)
152+
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
151153
for i := uint(0); i < c.playStageParallelism; i++ {
152154
go c.playStageWorker(ctx)
153155
}
@@ -258,7 +260,7 @@ func (c *PodController) playStageWorker(ctx context.Context) {
258260
logger := log.FromContext(ctx)
259261

260262
for ctx.Err() == nil {
261-
pod, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
263+
pod, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
262264
if !ok {
263265
return
264266
}

pkg/kwok/controllers/stage_controller.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type StageController struct {
5959
playStageParallelism uint
6060
lifecycle resources.Getter[lifecycle.Lifecycle]
6161
delayQueue queue.WeightDelayingQueue[resourceStageJob[*unstructured.Unstructured]]
62+
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*unstructured.Unstructured]]
6263
backoff wait.Backoff
6364
delayQueueMapping maps.SyncMap[string, resourceStageJob[*unstructured.Unstructured]]
6465
recorder record.EventRecorder
@@ -123,6 +124,7 @@ func NewStageController(conf StageControllerConfig) (*StageController, error) {
123124
// It will modify the resources status to we want
124125
func (c *StageController) Start(ctx context.Context, events <-chan informer.Event[*unstructured.Unstructured]) error {
125126
go c.preprocessWorker(ctx)
127+
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
126128
for i := uint(0); i < c.playStageParallelism; i++ {
127129
go c.playStageWorker(ctx)
128130
}
@@ -236,7 +238,7 @@ func (c *StageController) playStageWorker(ctx context.Context) {
236238
logger := log.FromContext(ctx)
237239

238240
for ctx.Err() == nil {
239-
resource, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
241+
resource, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
240242
if !ok {
241243
return
242244
}

pkg/utils/queue/adaptive_queue.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queue
18+
19+
import (
20+
"context"
21+
"sync"
22+
"time"
23+
)
24+
25+
type AdaptiveQueue[T any] struct {
26+
ctx context.Context
27+
startFunc func(ctx context.Context)
28+
latestStart time.Time
29+
mut sync.Mutex
30+
queue Queue[T]
31+
}
32+
33+
func NewAdaptiveQueue[T any](ctx context.Context, q Queue[T], startFunc func(ctx context.Context)) *AdaptiveQueue[T] {
34+
return &AdaptiveQueue[T]{
35+
ctx: ctx,
36+
startFunc: startFunc,
37+
latestStart: time.Now(),
38+
queue: q,
39+
}
40+
}
41+
42+
func (p *AdaptiveQueue[T]) GetOrWaitWithDone(done <-chan struct{}) (T, bool) {
43+
t, ok := p.queue.GetOrWaitWithDone(done)
44+
if !ok {
45+
return t, false
46+
}
47+
48+
length := p.queue.Len()
49+
if length > 3 {
50+
p.mut.Lock()
51+
defer p.mut.Unlock()
52+
now := time.Now()
53+
sub := now.Sub(p.latestStart)
54+
55+
if sub >= time.Second/10 {
56+
go p.startFunc(p.ctx)
57+
p.latestStart = now
58+
}
59+
}
60+
return t, true
61+
}

0 commit comments

Comments
 (0)