Skip to content

Commit 3280231

Browse files
committed
improvements
1 parent 32e85b6 commit 3280231

File tree

2 files changed

+25
-30
lines changed

2 files changed

+25
-30
lines changed

internal/controller/controller.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import (
1919
"github.com/castai/cluster-controller/internal/waitext"
2020
)
2121

22+
// gcCompletedActionAfterTimes specifies after how many GCs to remove the completed action from the store.
23+
const gcCompletedActionAfterTimes = 2
24+
2225
type Config struct {
2326
PollWaitInterval time.Duration // How long to wait unit next long polling request.
2427
PollTimeout time.Duration // hard timeout. Normally server should return empty result before this timeout.
@@ -45,7 +48,7 @@ func NewService(
4548
k8sVersion: k8sVersion,
4649
castAIClient: castaiClient,
4750
startedActions: make(map[string]struct{}),
48-
completedActions: make(map[string]time.Time),
51+
completedActions: make(map[string]int8),
4952
actionHandlers: actionHandlers,
5053
healthCheck: healthCheck,
5154
}
@@ -61,10 +64,11 @@ type Controller struct {
6164
actionHandlers actions.ActionHandlers
6265

6366
startedActionsWg sync.WaitGroup
64-
startedActions map[string]struct{}
65-
completedActions map[string]time.Time
66-
startedActionsMu sync.Mutex
67-
healthCheck *health.HealthzProvider
67+
actionsMu sync.Mutex
68+
startedActions map[string]struct{} // protected by actionsMu
69+
completedActions map[string]int8 // protected by actionsMu
70+
71+
healthCheck *health.HealthzProvider
6872
}
6973

7074
func (s *Controller) Run(ctx context.Context) {
@@ -170,21 +174,21 @@ func (s *Controller) handleActions(ctx context.Context, clusterActions []*castai
170174
}
171175

172176
func (s *Controller) finishProcessing(actionID string, ackErr error) {
173-
s.startedActionsMu.Lock()
174-
defer s.startedActionsMu.Unlock()
177+
s.actionsMu.Lock()
178+
defer s.actionsMu.Unlock()
175179

176180
s.startedActionsWg.Done()
177181
delete(s.startedActions, actionID)
178182

179183
if ackErr == nil {
180-
// only mark the action as completed if it was succesfully acknowledged so it can be retried quickly if not and still requested.
181-
s.completedActions[actionID] = time.Now()
184+
// only mark the action as completed if it was successfully acknowledged so it can be retried quickly if not and still requested.
185+
s.completedActions[actionID] = gcCompletedActionAfterTimes + 1
182186
}
183187
}
184188

185189
func (s *Controller) startProcessing(actionID string) bool {
186-
s.startedActionsMu.Lock()
187-
defer s.startedActionsMu.Unlock()
190+
s.actionsMu.Lock()
191+
defer s.actionsMu.Unlock()
188192

189193
if _, ok := s.startedActions[actionID]; ok {
190194
return false
@@ -261,17 +265,16 @@ func (s *Controller) ackAction(ctx context.Context, action *castai.ClusterAction
261265
}
262266

263267
func (s *Controller) gcCompletedActions() {
264-
expireDuration := (s.cfg.PollTimeout + s.cfg.PollWaitInterval) * 2
265-
now := time.Now()
266-
267-
s.startedActionsMu.Lock()
268-
defer s.startedActionsMu.Unlock()
268+
s.actionsMu.Lock()
269+
defer s.actionsMu.Unlock()
269270

270-
for actionID, completedAt := range s.completedActions {
271-
if now.Before(completedAt.Add(expireDuration)) {
271+
for actionID, timesVisited := range s.completedActions {
272+
timesVisited--
273+
if timesVisited <= 0 {
274+
delete(s.completedActions, actionID)
272275
continue
273276
}
274-
delete(s.completedActions, actionID)
277+
s.completedActions[actionID] = timesVisited
275278
}
276279
}
277280

internal/controller/controller_test.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func TestController_ParallelExecutionTest(t *testing.T) {
304304
})
305305
}
306306
actionsWithAckErr := map[string]struct{}{
307-
actions[2].ID: struct{}{},
307+
actions[2].ID: {},
308308
}
309309

310310
var (
@@ -366,18 +366,10 @@ func TestController_ParallelExecutionTest(t *testing.T) {
366366
<-ctx.Done()
367367
svc.startedActionsWg.Wait()
368368

369-
mu.Lock()
370-
observedMax := maxExecutingObserved
371-
counts := make(map[string]int)
372-
for k, v := range executionCounts {
373-
counts[k] = v
374-
}
375-
mu.Unlock()
376-
377-
require.LessOrEqual(t, observedMax, 2, "Expected no more than 2 actions to execute concurrently, but observed %d", observedMax)
369+
require.LessOrEqual(t, maxExecutingObserved, 2, "Expected no more than 2 actions to execute concurrently, but observed %d", maxExecutingObserved)
378370

379371
for _, action := range actions {
380-
count := counts[action.ID]
372+
count := executionCounts[action.ID]
381373
if _, ok := actionsWithAckErr[action.ID]; ok {
382374
assert.Equal(t, 3, count, "Expected action %s to be executed three times because of ack errors, but it was executed %d times", action.ID, count)
383375
continue

0 commit comments

Comments
 (0)