-
Notifications
You must be signed in to change notification settings - Fork 166
/
Copy pathcontroller_reconcile.go
172 lines (146 loc) · 6.48 KB
/
controller_reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Copyright 2025 The Kube Resource Orchestrator Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package resourcegraphdefinition
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/kro-run/kro/api/v1alpha1"
instancectrl "github.com/kro-run/kro/pkg/controller/instance"
"github.com/kro-run/kro/pkg/dynamiccontroller"
"github.com/kro-run/kro/pkg/graph"
"github.com/kro-run/kro/pkg/metadata"
)
// reconcileResourceGraphDefinition orchestrates the reconciliation of a ResourceGraphDefinition by:
// 1. Processing the resource graph
// 2. Ensuring CRDs are present
// 3. Setting up and starting the microcontroller
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(ctx context.Context, rgd *v1alpha1.ResourceGraphDefinition) ([]string, []v1alpha1.ResourceInformation, error) {
log, _ := logr.FromContext(ctx)
// Process resource graph definition graph first to validate structure
log.V(1).Info("reconciling resource graph definition graph")
processedRGD, resourcesInfo, err := r.reconcileResourceGraphDefinitionGraph(ctx, rgd)
if err != nil {
return nil, nil, err
}
// Setup metadata labeling
graphExecLabeler, err := r.setupLabeler(rgd)
if err != nil {
return nil, nil, fmt.Errorf("failed to setup labeler: %w", err)
}
crd := processedRGD.Instance.GetCRD()
graphExecLabeler.ApplyLabels(&crd.ObjectMeta)
// Ensure CRD exists and is up to date
log.V(1).Info("reconciling resource graph definition CRD")
if err := r.reconcileResourceGraphDefinitionCRD(ctx, crd); err != nil {
return processedRGD.TopologicalOrder, resourcesInfo, err
}
// Setup and start microcontroller
gvr := processedRGD.Instance.GetGroupVersionResource()
controller := r.setupMicroController(gvr, processedRGD, rgd.Spec.DefaultServiceAccounts, graphExecLabeler)
log.V(1).Info("reconciling resource graph definition micro controller")
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile, rgd.Spec.Weight); err != nil {
return processedRGD.TopologicalOrder, resourcesInfo, err
}
return processedRGD.TopologicalOrder, resourcesInfo, nil
}
// setupLabeler creates and merges the required labelers for the resource graph definition
func (r *ResourceGraphDefinitionReconciler) setupLabeler(rgd *v1alpha1.ResourceGraphDefinition) (metadata.Labeler, error) {
rgLabeler := metadata.NewResourceGraphDefinitionLabeler(rgd)
return r.metadataLabeler.Merge(rgLabeler)
}
// setupMicroController creates a new controller instance with the required configuration
func (r *ResourceGraphDefinitionReconciler) setupMicroController(
gvr schema.GroupVersionResource,
processedRGD *graph.Graph,
defaultSVCs map[string]string,
labeler metadata.Labeler,
) *instancectrl.Controller {
instanceLogger := r.rootLogger.WithName("controller." + gvr.Resource)
return instancectrl.NewController(
instanceLogger,
instancectrl.ReconcileConfig{
DefaultRequeueDuration: 3 * time.Second,
DeletionGraceTimeDuration: 30 * time.Second,
DeletionPolicy: "Delete",
},
gvr,
processedRGD,
r.clientSet,
defaultSVCs,
labeler,
)
}
// reconcileResourceGraphDefinitionGraph processes the resource graph definition to build a dependency graph
// and extract resource information
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionGraph(_ context.Context, rgd *v1alpha1.ResourceGraphDefinition) (*graph.Graph, []v1alpha1.ResourceInformation, error) {
processedRGD, err := r.rgBuilder.NewResourceGraphDefinition(rgd)
if err != nil {
return nil, nil, newGraphError(err)
}
resourcesInfo := make([]v1alpha1.ResourceInformation, 0, len(processedRGD.Resources))
for name, resource := range processedRGD.Resources {
deps := resource.GetDependencies()
if len(deps) > 0 {
resourcesInfo = append(resourcesInfo, buildResourceInfo(name, deps))
}
}
return processedRGD, resourcesInfo, nil
}
// buildResourceInfo creates a ResourceInformation struct from name and dependencies
func buildResourceInfo(name string, deps []string) v1alpha1.ResourceInformation {
dependencies := make([]v1alpha1.Dependency, 0, len(deps))
for _, dep := range deps {
dependencies = append(dependencies, v1alpha1.Dependency{ID: dep})
}
return v1alpha1.ResourceInformation{
ID: name,
Dependencies: dependencies,
}
}
// reconcileResourceGraphDefinitionCRD ensures the CRD is present and up to date in the cluster
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionCRD(ctx context.Context, crd *v1.CustomResourceDefinition) error {
if err := r.crdManager.Ensure(ctx, *crd); err != nil {
return newCRDError(err)
}
return nil
}
// reconcileResourceGraphDefinitionMicroController starts the microcontroller for handling the resources
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler, weight int) error {
err := r.dynamicController.StartServingGVK(ctx, *gvr, handler, weight)
if err != nil {
return newMicroControllerError(err)
}
return nil
}
// Error types for the resourcegraphdefinition controller
type (
graphError struct{ err error }
crdError struct{ err error }
microControllerError struct{ err error }
)
// Error interface implementation
func (e *graphError) Error() string { return e.err.Error() }
func (e *crdError) Error() string { return e.err.Error() }
func (e *microControllerError) Error() string { return e.err.Error() }
// Unwrap interface implementation
func (e *graphError) Unwrap() error { return e.err }
func (e *crdError) Unwrap() error { return e.err }
func (e *microControllerError) Unwrap() error { return e.err }
// Error constructors
func newGraphError(err error) error { return &graphError{err} }
func newCRDError(err error) error { return &crdError{err} }
func newMicroControllerError(err error) error { return µControllerError{err} }