Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e6adf27
feat: implement parallel walking of the DAG
bschaatsbergen Nov 14, 2025
28c1d5f
feat: implement parallel walking of the DAG
bschaatsbergen Nov 14, 2025
c370876
Merge branch 'parallel-walk' of github.com:bschaatsbergen/kro into pa…
bschaatsbergen Nov 14, 2025
b497552
remove dangling comment
bschaatsbergen Nov 14, 2025
992c230
use assert package for tests
bschaatsbergen Nov 14, 2025
5879e7a
remove walker in favor of level based topological execution
bschaatsbergen Nov 14, 2025
83a235d
rewrite dag tests to use the assert package
bschaatsbergen Nov 14, 2025
8f06cc6
document mutex
bschaatsbergen Nov 14, 2025
d034b4a
add some additional concurrency safeguards
bschaatsbergen Nov 14, 2025
eae9be4
bit of clean up
bschaatsbergen Nov 14, 2025
fb8db99
maintain a single applyset
bschaatsbergen Nov 16, 2025
670f2ba
update comments on thread safe access
bschaatsbergen Nov 16, 2025
5ef58fe
revert cli changes
bschaatsbergen Nov 16, 2025
c90b429
remove topologicalorder and sort
bschaatsbergen Nov 16, 2025
a5d5b93
add applyset spec link
bschaatsbergen Nov 16, 2025
d0a17fd
use a log field instead
bschaatsbergen Nov 16, 2025
3503221
add a better comment to reflect the process
bschaatsbergen Nov 16, 2025
b30c1e6
Merge remote-tracking branch 'upstream/main' into parallel-walk
bschaatsbergen Nov 17, 2025
040ef27
remove topologicalorder status field
bschaatsbergen Nov 17, 2025
5458a87
add nolint: dupl directives
bschaatsbergen Nov 17, 2025
2fb490f
match requeue duration to production
bschaatsbergen Nov 17, 2025
676f9e5
add missing header
bschaatsbergen Nov 17, 2025
d14d536
Revert "match requeue duration to production"
bschaatsbergen Nov 17, 2025
aca0bde
revert changes related to removing status field
bschaatsbergen Nov 17, 2025
8a530d5
restore topologicalOrder in status
bschaatsbergen Nov 17, 2025
1dab5b0
Revert nolint changes
bschaatsbergen Nov 17, 2025
487d108
use stable sorted order
bschaatsbergen Nov 17, 2025
db1385f
revert deletion of comment
bschaatsbergen Nov 17, 2025
a43a296
protect mutable maps in runtime package with a mutex
bschaatsbergen Nov 17, 2025
d15545d
implement kahn's algorithm
bschaatsbergen Nov 17, 2025
2585557
adjust order of ackekscluster test
bschaatsbergen Nov 17, 2025
7622340
small nit improvements
bschaatsbergen Nov 17, 2025
cb4862a
Merge branch 'main' into parallel-walk
bschaatsbergen Nov 18, 2025
11289f3
revert minor refactor
bschaatsbergen Nov 18, 2025
ad96238
Merge branch 'parallel-walk' of github.com:bschaatsbergen/kro into pa…
bschaatsbergen Nov 18, 2025
4e3550a
protect all maps/sets using the mutex
bschaatsbergen Nov 18, 2025
16accff
use waitgroup instead of errgroup to avoid cancelling on a non-nil error
bschaatsbergen Nov 18, 2025
585907c
simplify cycle detection and optimize for O(V+E) performance
bschaatsbergen Nov 18, 2025
695f741
remove TopologicalSort
bschaatsbergen Nov 18, 2025
1c9c2bb
avoid doing another full graph traversal with DFS to detect a cycle
bschaatsbergen Nov 18, 2025
7da9c61
Merge branch 'main' into parallel-walk
bschaatsbergen Nov 19, 2025
33ba0fa
Merge branch 'main' into parallel-walk
bschaatsbergen Dec 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions pkg/applyset/applyset.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,34 @@ type applySet struct {
// fieldManager is the name of the field manager that will be used to apply the resources.
fieldManager string

// toolLabels is a map of tool provided labels to be applied to the resources
// mu guards all maps and sets in applySet.
// These fields are accessed and mutated from multiple goroutines during
// reconciliation, so the lock must be held for every read or write to
// avoid race conditions and ensure consistent state.
mu sync.Mutex

// toolLabels is a map of tool provided labels to be applied to the resources.
// Protected by mu.
toolLabels map[string]string

// current labels and annotations of the parent before the apply operation
currentLabels map[string]string
// current labels and annotations of the parent before the apply operation.
// Protected by mu.
currentLabels map[string]string
// Protected by mu.
currentAnnotations map[string]string

// set of applyset object rest mappings
// set of applyset object rest mappings.
// Protected by mu.
desiredRESTMappings map[schema.GroupKind]*meta.RESTMapping
// set of applyset object namespaces
// set of applyset object namespaces.
// Protected by mu.
desiredNamespaces sets.Set[string]

// superset of desired and old namespaces
// superset of desired and old namespaces.
// Protected by mu.
supersetNamespaces sets.Set[string]
// superset of desired and old GKs
// superset of desired and old GKs.
// Protected by mu.
supersetGKs sets.Set[string]

desired *tracker
Expand All @@ -282,7 +295,9 @@ func (a *applySet) getAndRecordNamespace(obj ApplyableObject, restMapping *meta.
if namespace == "" {
namespace = a.parent.GetNamespace()
}
a.mu.Lock()
a.desiredNamespaces.Insert(namespace)
a.mu.Unlock()
case meta.RESTScopeNameRoot:
if obj.GetNamespace() != "" {
return fmt.Errorf("namespace was provided for cluster-scoped object %v %v", gvk, obj.GetName())
Expand All @@ -300,6 +315,10 @@ func (a *applySet) getAndRecordNamespace(obj ApplyableObject, restMapping *meta.
func (a *applySet) getRestMapping(obj ApplyableObject) (*meta.RESTMapping, error) {
gvk := obj.GroupVersionKind()
gk := gvk.GroupKind()

a.mu.Lock()
defer a.mu.Unlock()

// Ensure a rest mapping exists for the object
_, found := a.desiredRESTMappings[gk]
if !found {
Expand All @@ -317,7 +336,10 @@ func (a *applySet) getRestMapping(obj ApplyableObject) (*meta.RESTMapping, error
}

func (a *applySet) resourceClient(obj Applyable) (dynamic.ResourceInterface, error) {
a.mu.Lock()
restMapping, ok := a.desiredRESTMappings[obj.GroupVersionKind().GroupKind()]
a.mu.Unlock()

if !ok {
// This should never happen, but if it does, we want to know about it.
return nil, fmt.Errorf("FATAL: rest mapping not found for %v", obj.GroupVersionKind())
Expand All @@ -336,6 +358,7 @@ func (a *applySet) resourceClient(obj Applyable) (dynamic.ResourceInterface, err
return dynResource, nil
}

// Add adds an object to the applyset for later application.
func (a *applySet) Add(ctx context.Context, obj ApplyableObject) (*unstructured.Unstructured, error) {
restMapping, err := a.getRestMapping(obj)
if err != nil {
Expand Down Expand Up @@ -400,6 +423,8 @@ func (a *applySet) injectToolLabels(labels map[string]string) map[string]string
if labels == nil {
labels = make(map[string]string)
}
a.mu.Lock()
defer a.mu.Unlock()
if a.toolLabels != nil {
for k, v := range a.toolLabels {
labels[k] = v
Expand Down Expand Up @@ -477,6 +502,9 @@ func (a *applySet) desiredParentLabels() map[string]string {
func (a *applySet) desiredParentAnnotations(
includeCurrent bool,
) (map[string]string, sets.Set[string], sets.Set[string]) {
a.mu.Lock()
defer a.mu.Unlock()

annotations := make(map[string]string)
annotations[ApplySetToolingAnnotation] = a.toolingID.String()

Expand Down Expand Up @@ -523,14 +551,20 @@ func (a *applySet) apply(ctx context.Context, dryRun bool) (*ApplyResult, error)
return results, fmt.Errorf("unable to get parent: %w", err)
}
// Record the current labels and annotations
a.mu.Lock()
a.currentLabels = parent.GetLabels()
a.currentAnnotations = parent.GetAnnotations()
a.mu.Unlock()

// We will ensure the parent is updated with the latest applyset before applying the resources.
a.supersetNamespaces, a.supersetGKs, err = a.updateParentLabelsAndAnnotations(ctx, updateToSuperset)
supersetNamespaces, supersetGKs, err := a.updateParentLabelsAndAnnotations(ctx, updateToSuperset)
if err != nil {
return results, fmt.Errorf("unable to update Parent: %w", err)
}
a.mu.Lock()
a.supersetNamespaces = supersetNamespaces
a.supersetGKs = supersetGKs
a.mu.Unlock()
}

options := a.applyOptions
Expand All @@ -540,16 +574,13 @@ func (a *applySet) apply(ctx context.Context, dryRun bool) (*ApplyResult, error)

concurrency := a.concurrency
if concurrency <= 0 {
concurrency = len(a.desired.objects)
concurrency = a.desired.Len()
}

eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)

// protect concurrent access to write the apply results
var mu sync.Mutex

for _, obj := range a.desired.objects {
for _, obj := range a.desired.Objects() {
dynResource, err := a.resourceClient(obj)
if err != nil {
return results, err
Expand All @@ -558,8 +589,6 @@ func (a *applySet) apply(ctx context.Context, dryRun bool) (*ApplyResult, error)
// Apply resources using server-side apply
eg.Go(func() error {
lastApplied, err := a.applyResource(egctx, dynResource, obj, options)
mu.Lock()
defer mu.Unlock()
results.recordApplied(obj, lastApplied, err)
return nil
})
Expand Down
15 changes: 10 additions & 5 deletions pkg/applyset/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package applyset
import (
"context"
"fmt"
"maps"

"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -69,13 +70,17 @@ func (a *applySet) findAllObjectsToPrune(

tasks := []*task{}

// Take a snapshot of the protected fields so we
// can release the lock before making network calls.
a.mu.Lock()
restMappings := map[schema.GroupKind]*meta.RESTMapping{}
for gk, restMapping := range a.desiredRESTMappings {
restMappings[gk] = restMapping
}
maps.Copy(restMappings, a.desiredRESTMappings)
supersetGKs := a.supersetGKs.Clone()
supersetNamespaces := a.supersetNamespaces.Clone()
a.mu.Unlock()

// add restmapping for older GKs
for _, entry := range a.supersetGKs.UnsortedList() {
for _, entry := range supersetGKs.UnsortedList() {
if entry == "" {
continue
}
Expand All @@ -96,7 +101,7 @@ func (a *applySet) findAllObjectsToPrune(
for _, restMapping := range restMappings {
switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace:
for _, namespace := range a.supersetNamespaces.UnsortedList() {
for _, namespace := range supersetNamespaces.UnsortedList() {
if namespace == "" {
namespace = metav1.NamespaceDefault
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/applyset/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package applyset
import (
"errors"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -61,6 +62,12 @@ type PrunedObject struct {
// Any errors returned by the apply call is also recorded in it.
// PrunedObject records any error returned by the delete call.
type ApplyResult struct {
// mu guards AppliedObjects and PrunedObjects.
// These fields are accessed and mutated from multiple goroutines during
// parallel apply operations, so the lock must be held for every read or write to
// avoid race conditions and ensure consistent state.
mu sync.Mutex

DesiredCount int
AppliedObjects []AppliedObject
PrunedObjects []PrunedObject
Expand Down Expand Up @@ -116,6 +123,9 @@ func (a *ApplyResult) recordApplied(
lastApplied *unstructured.Unstructured,
err error,
) {
a.mu.Lock()
defer a.mu.Unlock()

ao := AppliedObject{
ApplyableObject: obj,
LastApplied: lastApplied,
Expand All @@ -128,6 +138,9 @@ func (a *ApplyResult) recordPruned(
obj PruneObject,
err error,
) PrunedObject {
a.mu.Lock()
defer a.mu.Unlock()

po := PrunedObject{
PruneObject: obj,
Error: err,
Expand Down
28 changes: 28 additions & 0 deletions pkg/applyset/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package applyset
import (
"encoding/json"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -63,14 +64,24 @@ type k8sObjectKey struct {
types.NamespacedName
}

// tracker manages a collection of resources to be applied.
type tracker struct {
// mu guards all maps and sets in tracker.
// These fields are accessed and mutated from multiple goroutines during
// reconciliation, so the lock must be held for every read or write to
// avoid race conditions and ensure consistent state.
mu sync.Mutex

// objects is a list of objects we are applying.
// Protected by mu.
objects []ApplyableObject

// serverIDs is a map of object key to object.
// Protected by mu.
serverIDs map[k8sObjectKey]bool

// clientIDs is a map of object key to object.
// Protected by mu.
clientIDs map[string]bool
}

Expand All @@ -82,6 +93,9 @@ func NewTracker() *tracker {
}

func (t *tracker) Add(obj ApplyableObject) error {
t.mu.Lock()
defer t.mu.Unlock()

gvk := obj.GroupVersionKind()

// Server side uniqueness check
Expand Down Expand Up @@ -120,5 +134,19 @@ func (t *tracker) Add(obj ApplyableObject) error {
}

func (t *tracker) Len() int {
t.mu.Lock()
defer t.mu.Unlock()
return len(t.objects)
}

// Objects returns a thread-safe snapshot of all tracked objects.
// The returned slice is a copy, so callers can safely iterate or modify it
// without worrying about concurrent changes to the underlying tracker.
func (t *tracker) Objects() []ApplyableObject {
t.mu.Lock()
defer t.mu.Unlock()
// Return a copy to prevent concurrent modification
result := make([]ApplyableObject, len(t.objects))
copy(result, t.objects)
return result
}
Loading