Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@
actionHandlers actions.ActionHandlers,
) *Controller {
return &Controller{
log: log,
cfg: cfg,
k8sVersion: k8sVersion,
castAIClient: castaiClient,
startedActions: map[string]struct{}{},
actionHandlers: actionHandlers,
healthCheck: healthCheck,
log: log,
cfg: cfg,
k8sVersion: k8sVersion,
castAIClient: castaiClient,
startedActions: make(map[string]struct{}),
completedActions: make(map[string]time.Time),
actionHandlers: actionHandlers,
healthCheck: healthCheck,
}
}

Expand All @@ -61,6 +62,7 @@

startedActionsWg sync.WaitGroup
startedActions map[string]struct{}
completedActions map[string]time.Time
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess it's time to create a map + RWMutex as a separate struct

startedActionsMu sync.Mutex
healthCheck *health.HealthzProvider
}
Expand Down Expand Up @@ -122,6 +124,7 @@

s.log.WithFields(logrus.Fields{"n": strconv.Itoa(len(actions))}).Infof("received in %s", pollDuration)
s.handleActions(ctx, actions)
s.gcCompletedActions()
return nil
}

Expand All @@ -132,7 +135,10 @@
}

go func(action *castai.ClusterAction) {
defer s.finishProcessing(action.ID)
var ackErr error
defer func() {
s.finishProcessing(action.ID, ackErr)
}()

var err error

Expand All @@ -146,7 +152,7 @@
}

handleDuration := time.Since(startTime)
ackErr := s.ackAction(ctx, action, handleErr, handleDuration)
ackErr = s.ackAction(ctx, action, handleErr, handleDuration)
if handleErr != nil {
err = handleErr
}
Expand All @@ -163,12 +169,17 @@
}
}

func (s *Controller) finishProcessing(actionID string) {
func (s *Controller) finishProcessing(actionID string, ackErr error) {
s.startedActionsMu.Lock()
defer s.startedActionsMu.Unlock()

s.startedActionsWg.Done()
delete(s.startedActions, actionID)

if ackErr == nil {
// only mark the action as completed if it was succesfully acknowledged so it can be retried quickly if not and still requested.

Check failure on line 180 in internal/controller/controller.go

View workflow job for this annotation

GitHub Actions / lint

`succesfully` is a misspelling of `successfully` (misspell)
s.completedActions[actionID] = time.Now()
}
}

func (s *Controller) startProcessing(actionID string) bool {
Expand All @@ -179,13 +190,19 @@
return false
}

if _, ok := s.completedActions[actionID]; ok {
s.log.WithField(actions.ActionIDLogField, actionID).Debug("action has been recently completed, not starting")
return false
}

if inProgress := len(s.startedActions); inProgress >= s.cfg.MaxActionsInProgress {
s.log.Warnf("too many actions in progress %d/%d", inProgress, s.cfg.MaxActionsInProgress)
return false
}

s.startedActionsWg.Add(1)
s.startedActions[actionID] = struct{}{}

return true
}

Expand Down Expand Up @@ -243,6 +260,21 @@
})
}

func (s *Controller) gcCompletedActions() {
expireDuration := (s.cfg.PollTimeout + s.cfg.PollWaitInterval) * 2
now := time.Now()

s.startedActionsMu.Lock()
defer s.startedActionsMu.Unlock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are u locking on startedActionsMu here? i think it's better to lock on separate mutex
once again, i think we need simple concurrent map as separate struct


for actionID, completedAt := range s.completedActions {
if now.Before(completedAt.Add(expireDuration)) {
continue
}
delete(s.completedActions, actionID)
}
}

func getHandlerError(err error) *string {
if err != nil {
str := err.Error()
Expand Down
123 changes: 123 additions & 0 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
"context"
"fmt"
"reflect"
"strconv"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

Expand Down Expand Up @@ -264,6 +268,125 @@
}
}

func TestController_ParallelExecutionTest(t *testing.T) {
t.Parallel()

synctest.Test(t, func(t *testing.T) {
cfg := Config{
PollWaitInterval: time.Second,
PollTimeout: 50 * time.Millisecond,
AckTimeout: time.Second,
AckRetriesCount: 2,
AckRetryWait: time.Millisecond,
ClusterID: uuid.New().String(),
MaxActionsInProgress: 2,
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

client := mock_castai.NewMockCastAIClient(ctrl)
handler := mock_actions.NewMockActionHandler(ctrl)

testActionHandlers := map[reflect.Type]actions.ActionHandler{
reflect.TypeFor[*castai.ActionCreateEvent](): handler,
}

const maxActions = 4
actions := make([]*castai.ClusterAction, 0, maxActions)
for i := range maxActions {
actions = append(actions, &castai.ClusterAction{
ID: "action-" + strconv.Itoa(i),
CreatedAt: time.Now(),
ActionCreateEvent: &castai.ActionCreateEvent{
EventType: "fake",
},
})
}
actionsWithAckErr := map[string]struct{}{
actions[2].ID: struct{}{},

Check failure on line 307 in internal/controller/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
}

var (
mu sync.Mutex
currentlyExecuting int
maxExecutingObserved int
executionCounts = make(map[string]int)
)

handler.EXPECT().Handle(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, action *castai.ClusterAction) error {
mu.Lock()
currentlyExecuting++
executionCounts[action.ID]++
if currentlyExecuting > maxExecutingObserved {
maxExecutingObserved = currentlyExecuting
}
mu.Unlock()

time.Sleep(100 * time.Millisecond)

mu.Lock()
currentlyExecuting--
mu.Unlock()

return nil
},
).AnyTimes()

client.EXPECT().GetActions(gomock.Any(), gomock.Any()).Return(actions, nil).Times(1)
client.EXPECT().AckAction(gomock.Any(), gomock.Any(), &castai.AckClusterActionRequest{}).
DoAndReturn(func(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error {
if _, ok := actionsWithAckErr[actionID]; ok {
return assert.AnError
}
return nil
}).AnyTimes()

client.EXPECT().GetActions(gomock.Any(), gomock.Any()).Return(actions, nil).Times(3)

client.EXPECT().GetActions(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

logger := logrus.New()
svc := NewService(
logger,
cfg,
"v0",
client,
health.NewHealthzProvider(health.HealthzCfg{HealthyPollIntervalLimit: cfg.PollTimeout}, logger),
testActionHandlers,
)

ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()

go svc.Run(ctx)

synctest.Wait()
<-ctx.Done()
svc.startedActionsWg.Wait()

mu.Lock()
observedMax := maxExecutingObserved
counts := make(map[string]int)
for k, v := range executionCounts {
counts[k] = v

Check failure on line 373 in internal/controller/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

mapsloop: Replace m[k]=v loop with maps.Copy (modernize)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, this is frustrating
image
i don't think linter should be that strict

}
mu.Unlock()

require.LessOrEqual(t, observedMax, 2, "Expected no more than 2 actions to execute concurrently, but observed %d", observedMax)

for _, action := range actions {
count := counts[action.ID]
if _, ok := actionsWithAckErr[action.ID]; ok {
assert.Equal(t, 3, count, "Expected action %s to be executed three times because of ack errors, but it was executed %d times", action.ID, count)
continue
}
assert.Equal(t, 1, count, "Expected action %s to be executed exactly once, but it was executed %d times", action.ID, count)
}
})
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"))
}
Loading