Skip to content

Commit 61aaf40

Browse files
committed
NVIDIA-472: Add CSR approval for DPU worker nodes
- Dedicated controller: CSRApprovalReconciler polls hosted clusters every 30s - CSR validation: Hostname matching, DPU ownership verification, certificate content checks - Client caching: ClientManager caches kubeconfig clients per hosted cluster - Independent operation: Runs alongside main controller
1 parent 01869f4 commit 61aaf40

15 files changed

Lines changed: 2003 additions & 18 deletions

File tree

api/v1alpha1/dpfhcpprovisioner_types.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ const (
179179
// HostedClusterCleanup indicates the status of HostedCluster deletion during finalizer cleanup.
180180
HostedClusterCleanup string = "HostedClusterCleanup"
181181

182+
// CSRAutoApprovalActive indicates whether CSR auto-approval is active and watching for CSRs
183+
CSRAutoApprovalActive string = "CSRAutoApprovalActive"
184+
182185
// Validation conditions.
183186

184187
// SecretsValid indicates whether required secrets (pull secret, SSH key) are valid.
@@ -229,6 +232,19 @@ const (
229232
ReasonKubeConfigInjectionFailed string = "InjectionFailed"
230233
)
231234

235+
// Condition reasons for DPFHCPProvisioner CSRAutoApprovalActive status.
236+
// These are used as the Reason field in the CSRAutoApprovalActive condition.
237+
const (
238+
// ReasonCSRApprovalActive indicates CSR auto-approval is actively processing CSRs
239+
ReasonCSRApprovalActive string = "Active"
240+
241+
// ReasonKubeconfigNotAvailable indicates the kubeconfig is not available
242+
ReasonKubeconfigNotAvailable string = "KubeconfigNotAvailable"
243+
244+
// ReasonHostedClusterNotReachable indicates the hosted cluster is not reachable
245+
ReasonHostedClusterNotReachable string = "HostedClusterNotReachable"
246+
)
247+
232248
// DPFHCPProvisionerStatus defines the observed state of DPFHCPProvisioner
233249
type DPFHCPProvisionerStatus struct {
234250
// Phase represents the current lifecycle phase

cmd/main.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/common"
4444
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller"
4545
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/bluefield"
46+
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/csrapproval"
4647
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/dpucluster"
4748
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/finalizer"
4849
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/hostedcluster"
@@ -217,17 +218,20 @@ func main() {
217218
}
218219

219220
client := mgr.GetClient()
220-
recorder := mgr.GetEventRecorderFor(common.ControllerName)
221221
scheme := mgr.GetScheme()
222222

223+
// Create event recorders for each controller/component
224+
provisionerRecorder := mgr.GetEventRecorderFor(common.ProvisionerControllerName)
225+
csrApprovalRecorder := mgr.GetEventRecorderFor(common.CSRApprovalControllerName)
226+
223227
// Initialize BlueField Image Resolver
224-
imageResolver := bluefield.NewImageResolver(client, recorder)
228+
imageResolver := bluefield.NewImageResolver(client, provisionerRecorder)
225229

226230
// Initialize DPUCluster Validator
227-
dpuClusterValidator := dpucluster.NewValidator(client, recorder)
231+
dpuClusterValidator := dpucluster.NewValidator(client, provisionerRecorder)
228232

229233
// Initialize Secrets Validator
230-
secretsValidator := secrets.NewValidator(client, recorder)
234+
secretsValidator := secrets.NewValidator(client, provisionerRecorder)
231235

232236
// Initialize Secret Manager for HostedCluster lifecycle
233237
secretManager := hostedcluster.NewSecretManager(client, scheme)
@@ -239,32 +243,36 @@ func main() {
239243
nodePoolManager := hostedcluster.NewNodePoolManager(client, scheme)
240244

241245
// Initialize Kubeconfig Injector
242-
kubeconfigInjector := kubeconfiginjection.NewKubeconfigInjector(client, recorder)
246+
kubeconfigInjector := kubeconfiginjection.NewKubeconfigInjector(client, provisionerRecorder)
243247

244248
// Initialize MetalLB Manager
245-
metalLBManager := metallb.NewMetalLBManager(client, recorder)
249+
metalLBManager := metallb.NewMetalLBManager(client, provisionerRecorder)
250+
251+
// Initialize CSR Approver
252+
csrApprover := csrapproval.NewCSRApprover(client, csrApprovalRecorder)
246253

247254
// Initialize Finalizer Manager with pluggable cleanup handlers
248255
// Handlers are executed in registration order
249-
finalizerManager := finalizer.NewManager(client, recorder)
256+
finalizerManager := finalizer.NewManager(client, provisionerRecorder)
250257

251258
// Register cleanup handlers in order (dependent resources first, dependencies last)
252259
// 1. Kubeconfig injection cleanup (removes kubeconfig from DPUCluster namespace)
253-
finalizerManager.RegisterHandler(kubeconfiginjection.NewCleanupHandler(client, recorder))
260+
finalizerManager.RegisterHandler(kubeconfiginjection.NewCleanupHandler(client, provisionerRecorder))
254261
// 2. HostedCluster cleanup (removes HostedCluster, NodePool, services, and secrets)
255262
// Must run before MetalLB cleanup because LoadBalancer services depend on IPAddressPool
256-
finalizerManager.RegisterHandler(hostedcluster.NewCleanupHandler(client, recorder))
263+
finalizerManager.RegisterHandler(hostedcluster.NewCleanupHandler(client, provisionerRecorder))
257264
// 3. MetalLB cleanup (removes IPAddressPool and L2Advertisement)
258265
// Must run after HostedCluster cleanup to avoid deleting IPs while services still exist
259-
finalizerManager.RegisterHandler(metallb.NewCleanupHandler(client, recorder))
266+
finalizerManager.RegisterHandler(metallb.NewCleanupHandler(client, provisionerRecorder))
260267

261268
// Initialize Status Syncer for HostedCluster status mirroring
262269
statusSyncer := hostedcluster.NewStatusSyncer(client)
263270

271+
// Setup main DPFHCPProvisioner controller
264272
if err := (&controller.DPFHCPProvisionerReconciler{
265273
Client: client,
266274
Scheme: scheme,
267-
Recorder: recorder,
275+
Recorder: provisionerRecorder,
268276
ImageResolver: imageResolver,
269277
DPUClusterValidator: dpuClusterValidator,
270278
SecretsValidator: secretsValidator,
@@ -279,6 +287,16 @@ func main() {
279287
setupLog.Error(err, "unable to create controller", "controller", "DPFHCPProvisioner")
280288
os.Exit(1)
281289
}
290+
291+
// Setup CSR Approval controller (separate from main controller)
292+
if err := (&csrapproval.CSRApprovalReconciler{
293+
Client: client,
294+
Scheme: scheme,
295+
Approver: csrApprover,
296+
}).SetupWithManager(mgr); err != nil {
297+
setupLog.Error(err, "unable to create controller", "controller", "CSRApproval")
298+
os.Exit(1)
299+
}
282300
// +kubebuilder:scaffold:builder
283301

284302
if metricsCertWatcher != nil {

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,11 @@ rules:
122122
- patch
123123
- update
124124
- watch
125+
- apiGroups:
126+
- provisioning.dpu.nvidia.com
127+
resources:
128+
- dpus
129+
verbs:
130+
- get
131+
- list
132+
- watch

helm/dpf-hcp-provisioner-operator/templates/clusterrole.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ rules:
113113
- update
114114
- watch
115115

116+
# DPU permissions (for CSR validation against DPU objects)
117+
- apiGroups:
118+
- provisioning.dpu.nvidia.com
119+
resources:
120+
- dpus
121+
verbs:
122+
- get
123+
- list
124+
- watch
125+
116126
# HyperShift HostedCluster and NodePool permissions
117127
- apiGroups:
118128
- hypershift.openshift.io

internal/common/constants.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ const (
2222
// If the CR is renamed, update this constant once and it propagates everywhere.
2323
DPFHCPProvisionerName = "dpfhcpprovisioner"
2424

25-
// ControllerName is the name used for event recorders
26-
ControllerName = "dpfhcpprovisioner-controller"
25+
// ProvisionerControllerName is the name for the main provisioner controller event recorder
26+
ProvisionerControllerName = "dpfhcpprovisioner-controller"
27+
28+
// CSRApprovalControllerName is the name for the CSR approval controller event recorder
29+
CSRApprovalControllerName = "csr-approval-controller"
2730
)
2831

2932
// Label keys for cross-namespace resource ownership tracking
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
Copyright 2025.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package csrapproval
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
corev1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/types"
25+
"k8s.io/client-go/kubernetes"
26+
"k8s.io/client-go/tools/clientcmd"
27+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
)
30+
31+
// ClientManager manages hosted cluster client lifecycle
32+
type ClientManager struct {
33+
mgmtClient client.Client
34+
// hcClients caches Kubernetes clientsets for hosted clusters to avoid recreating them on every reconciliation.
35+
// Each DPFHCPProvisioner creates a hosted cluster with its own API server. This map stores one clientset
36+
// per hosted cluster (keyed by "namespace/name") to reuse connections and avoid expensive client creation
37+
// (parsing kubeconfig, establishing TCP connections) every 30 seconds during CSR polling.
38+
// Without this cache, we would create 120+ clients per hour per hosted cluster.
39+
hcClients map[string]*kubernetes.Clientset
40+
}
41+
42+
// NewClientManager creates a new client manager
43+
func NewClientManager(mgmtClient client.Client) *ClientManager {
44+
return &ClientManager{
45+
mgmtClient: mgmtClient,
46+
hcClients: make(map[string]*kubernetes.Clientset),
47+
}
48+
}
49+
50+
// GetHostedClusterClient retrieves or creates a client for the hosted cluster
51+
func (cm *ClientManager) GetHostedClusterClient(ctx context.Context, namespace, name string) (*kubernetes.Clientset, error) {
52+
key := namespace + "/" + name
53+
54+
// Return cached client if it exists
55+
if clientset, ok := cm.hcClients[key]; ok {
56+
return clientset, nil
57+
}
58+
59+
// Create new client
60+
clientset, err := cm.createHostedClusterClient(ctx, namespace, name)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
// Cache the client
66+
cm.hcClients[key] = clientset
67+
68+
return clientset, nil
69+
}
70+
71+
// InvalidateClient removes a cached client (useful when kubeconfig rotates)
72+
func (cm *ClientManager) InvalidateClient(namespace, name string) {
73+
key := namespace + "/" + name
74+
delete(cm.hcClients, key)
75+
}
76+
77+
// createHostedClusterClient creates a Kubernetes client for the hosted cluster
78+
func (cm *ClientManager) createHostedClusterClient(ctx context.Context, namespace, name string) (*kubernetes.Clientset, error) {
79+
// Fetch kubeconfig secret
80+
kubeconfigData, err := cm.getKubeconfigData(ctx, namespace, name)
81+
if err != nil {
82+
return nil, fmt.Errorf("failed to get kubeconfig: %w", err)
83+
}
84+
85+
// Parse kubeconfig
86+
kubeconfig, err := clientcmd.Load(kubeconfigData)
87+
if err != nil {
88+
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
89+
}
90+
91+
// Replace external endpoint with internal service DNS name
92+
// The HyperShift admin-kubeconfig uses external endpoints (LoadBalancer IP or NodePort)
93+
// which are not accessible from inside the operator pod's network.
94+
// We need to use the internal service endpoint for in-cluster access.
95+
if err := replaceServerWithInternalEndpoint(ctx, cm.mgmtClient, kubeconfig, namespace, name); err != nil {
96+
return nil, fmt.Errorf("failed to replace server endpoint: %w", err)
97+
}
98+
99+
// Create REST config from modified kubeconfig
100+
config, err := clientcmd.NewDefaultClientConfig(*kubeconfig, &clientcmd.ConfigOverrides{}).ClientConfig()
101+
if err != nil {
102+
return nil, fmt.Errorf("failed to create rest config from kubeconfig: %w", err)
103+
}
104+
105+
// Set reasonable timeouts
106+
config.Timeout = 0 // No timeout for long-lived connections (watches)
107+
config.QPS = 5
108+
config.Burst = 10
109+
110+
// Create clientset
111+
clientset, err := kubernetes.NewForConfig(config)
112+
if err != nil {
113+
return nil, fmt.Errorf("failed to create clientset: %w", err)
114+
}
115+
116+
return clientset, nil
117+
}
118+
119+
// getKubeconfigData retrieves the kubeconfig data from the admin secret
120+
func (cm *ClientManager) getKubeconfigData(ctx context.Context, namespace, name string) ([]byte, error) {
121+
// The kubeconfig secret name follows HyperShift convention: <hostedcluster-name>-admin-kubeconfig
122+
secretName := name + "-admin-kubeconfig"
123+
124+
secret := &corev1.Secret{}
125+
secretKey := types.NamespacedName{
126+
Namespace: namespace,
127+
Name: secretName,
128+
}
129+
130+
if err := cm.mgmtClient.Get(ctx, secretKey, secret); err != nil {
131+
return nil, fmt.Errorf("failed to get kubeconfig secret %s: %w", secretKey, err)
132+
}
133+
134+
kubeconfigData, ok := secret.Data["kubeconfig"]
135+
if !ok {
136+
return nil, fmt.Errorf("kubeconfig key not found in secret %s", secretKey)
137+
}
138+
139+
if len(kubeconfigData) == 0 {
140+
return nil, fmt.Errorf("kubeconfig data is empty in secret %s", secretKey)
141+
}
142+
143+
return kubeconfigData, nil
144+
}
145+
146+
// replaceServerWithInternalEndpoint modifies the kubeconfig to use internal service DNS name
147+
// instead of the external LoadBalancer IP or NodePort. This allows the operator pod (running inside the cluster)
148+
// to reach the hosted cluster API server via the internal Kubernetes service.
149+
//
150+
// HyperShift creates admin-kubeconfig with external endpoints:
151+
// - LoadBalancer: https://10.6.135.42:6443 (example external IP, not accessible from operator pod)
152+
// - NodePort: https://<node-ip>:31039 (example NodePort, dynamically allocated per cluster)
153+
//
154+
// This function replaces it with the internal service DNS name:
155+
// https://kube-apiserver.<namespace>-<name>.svc.cluster.local:6443
156+
//
157+
// Port 6443 is hardcoded to match HyperShift's implementation. HyperShift itself hardcodes
158+
// the kube-apiserver port as a constant (KASSVCPort = 6443) in their codebase.
159+
func replaceServerWithInternalEndpoint(ctx context.Context, mgmtClient client.Client, kubeconfig *clientcmdapi.Config, hostedClusterNamespace, hostedClusterName string) error {
160+
if kubeconfig == nil {
161+
return fmt.Errorf("kubeconfig is nil")
162+
}
163+
164+
// Find the current context
165+
currentContext := kubeconfig.CurrentContext
166+
if currentContext == "" {
167+
return fmt.Errorf("kubeconfig has no current context")
168+
}
169+
170+
context, ok := kubeconfig.Contexts[currentContext]
171+
if !ok {
172+
return fmt.Errorf("context %s not found in kubeconfig", currentContext)
173+
}
174+
175+
// Find the cluster referenced by the context
176+
clusterName := context.Cluster
177+
cluster, ok := kubeconfig.Clusters[clusterName]
178+
if !ok {
179+
return fmt.Errorf("cluster %s not found in kubeconfig", clusterName)
180+
}
181+
182+
// Construct the service namespace following HyperShift convention
183+
serviceNamespace := fmt.Sprintf("%s-%s", hostedClusterNamespace, hostedClusterName)
184+
185+
// Construct internal service DNS name with hardcoded port 6443 (matching HyperShift's approach)
186+
internalServer := fmt.Sprintf("https://kube-apiserver.%s.svc.cluster.local:6443", serviceNamespace)
187+
188+
// Replace the server URL
189+
cluster.Server = internalServer
190+
191+
return nil
192+
}
193+
194+
// TestConnection verifies the hosted cluster client can connect to the API server
195+
func TestConnection(ctx context.Context, clientset *kubernetes.Clientset) error {
196+
_, err := clientset.Discovery().ServerVersion()
197+
if err != nil {
198+
return fmt.Errorf("failed to connect to hosted cluster API server: %w", err)
199+
}
200+
return nil
201+
}

0 commit comments

Comments
 (0)