Skip to content

Commit 004de50

Browse files
committed
NVIDIA-472: Add CSR approval for DPU worker nodes
- Dedicated controller: CSRApprovalReconciler polls hosted clusters every 30s - CSR validation: Hostname matching, DPU ownership verification, certificate content checks - Client caching: ClientManager caches kubeconfig clients per hosted cluster - Independent operation: Runs alongside main controller
1 parent a96a619 commit 004de50

16 files changed

Lines changed: 2224 additions & 18 deletions

File tree

api/v1alpha1/dpfhcpprovisioner_types.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ const (
190190
// HostedClusterCleanup indicates the status of HostedCluster deletion during finalizer cleanup.
191191
HostedClusterCleanup string = "HostedClusterCleanup"
192192

193+
// CSRAutoApprovalActive indicates whether CSR auto-approval is active and watching for CSRs
194+
CSRAutoApprovalActive string = "CSRAutoApprovalActive"
195+
193196
// Validation conditions.
194197

195198
// SecretsValid indicates whether required secrets (pull secret, SSH key) are valid.
@@ -240,6 +243,19 @@ const (
240243
ReasonKubeConfigInjectionFailed string = "InjectionFailed"
241244
)
242245

246+
// Condition reasons for DPFHCPProvisioner CSRAutoApprovalActive status.
247+
// These are used as the Reason field in the CSRAutoApprovalActive condition.
248+
const (
249+
// ReasonCSRApprovalActive indicates CSR auto-approval is actively processing CSRs
250+
ReasonCSRApprovalActive string = "Active"
251+
252+
// ReasonKubeconfigNotAvailable indicates the kubeconfig is not available
253+
ReasonKubeconfigNotAvailable string = "KubeconfigNotAvailable"
254+
255+
// ReasonHostedClusterNotReachable indicates the hosted cluster is not reachable
256+
ReasonHostedClusterNotReachable string = "HostedClusterNotReachable"
257+
)
258+
243259
// DPFHCPProvisionerStatus defines the observed state of DPFHCPProvisioner
244260
type DPFHCPProvisionerStatus struct {
245261
// Phase represents the current lifecycle phase

cmd/main.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/common"
4444
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller"
4545
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/bluefield"
46+
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/csrapproval"
4647
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/dpucluster"
4748
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/finalizer"
4849
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/hostedcluster"
@@ -217,17 +218,20 @@ func main() {
217218
}
218219

219220
client := mgr.GetClient()
220-
recorder := mgr.GetEventRecorderFor(common.ControllerName)
221221
scheme := mgr.GetScheme()
222222

223+
// Create event recorders for each controller/component
224+
provisionerRecorder := mgr.GetEventRecorderFor(common.ProvisionerControllerName)
225+
csrApprovalRecorder := mgr.GetEventRecorderFor(common.CSRApprovalControllerName)
226+
223227
// Initialize BlueField Image Resolver
224-
imageResolver := bluefield.NewImageResolver(client, recorder)
228+
imageResolver := bluefield.NewImageResolver(client, provisionerRecorder)
225229

226230
// Initialize DPUCluster Validator
227-
dpuClusterValidator := dpucluster.NewValidator(client, recorder)
231+
dpuClusterValidator := dpucluster.NewValidator(client, provisionerRecorder)
228232

229233
// Initialize Secrets Validator
230-
secretsValidator := secrets.NewValidator(client, recorder)
234+
secretsValidator := secrets.NewValidator(client, provisionerRecorder)
231235

232236
// Initialize Secret Manager for HostedCluster lifecycle
233237
secretManager := hostedcluster.NewSecretManager(client, scheme)
@@ -239,32 +243,36 @@ func main() {
239243
nodePoolManager := hostedcluster.NewNodePoolManager(client, scheme)
240244

241245
// Initialize Kubeconfig Injector
242-
kubeconfigInjector := kubeconfiginjection.NewKubeconfigInjector(client, recorder)
246+
kubeconfigInjector := kubeconfiginjection.NewKubeconfigInjector(client, provisionerRecorder)
243247

244248
// Initialize MetalLB Manager
245-
metalLBManager := metallb.NewMetalLBManager(client, recorder)
249+
metalLBManager := metallb.NewMetalLBManager(client, provisionerRecorder)
250+
251+
// Initialize CSR Approver
252+
csrApprover := csrapproval.NewCSRApprover(client, csrApprovalRecorder)
246253

247254
// Initialize Finalizer Manager with pluggable cleanup handlers
248255
// Handlers are executed in registration order
249-
finalizerManager := finalizer.NewManager(client, recorder)
256+
finalizerManager := finalizer.NewManager(client, provisionerRecorder)
250257

251258
// Register cleanup handlers in order (dependent resources first, dependencies last)
252259
// 1. Kubeconfig injection cleanup (removes kubeconfig from DPUCluster namespace)
253-
finalizerManager.RegisterHandler(kubeconfiginjection.NewCleanupHandler(client, recorder))
260+
finalizerManager.RegisterHandler(kubeconfiginjection.NewCleanupHandler(client, provisionerRecorder))
254261
// 2. HostedCluster cleanup (removes HostedCluster, NodePool, services, and secrets)
255262
// Must run before MetalLB cleanup because LoadBalancer services depend on IPAddressPool
256-
finalizerManager.RegisterHandler(hostedcluster.NewCleanupHandler(client, recorder))
263+
finalizerManager.RegisterHandler(hostedcluster.NewCleanupHandler(client, provisionerRecorder))
257264
// 3. MetalLB cleanup (removes IPAddressPool and L2Advertisement)
258265
// Must run after HostedCluster cleanup to avoid deleting IPs while services still exist
259-
finalizerManager.RegisterHandler(metallb.NewCleanupHandler(client, recorder))
266+
finalizerManager.RegisterHandler(metallb.NewCleanupHandler(client, provisionerRecorder))
260267

261268
// Initialize Status Syncer for HostedCluster status mirroring
262269
statusSyncer := hostedcluster.NewStatusSyncer(client)
263270

271+
// Setup main DPFHCPProvisioner controller
264272
if err := (&controller.DPFHCPProvisionerReconciler{
265273
Client: client,
266274
Scheme: scheme,
267-
Recorder: recorder,
275+
Recorder: provisionerRecorder,
268276
ImageResolver: imageResolver,
269277
DPUClusterValidator: dpuClusterValidator,
270278
SecretsValidator: secretsValidator,
@@ -279,6 +287,16 @@ func main() {
279287
setupLog.Error(err, "unable to create controller", "controller", "DPFHCPProvisioner")
280288
os.Exit(1)
281289
}
290+
291+
// Setup CSR Approval controller (separate from main controller)
292+
if err := (&csrapproval.CSRApprovalReconciler{
293+
Client: client,
294+
Scheme: scheme,
295+
Approver: csrApprover,
296+
}).SetupWithManager(mgr); err != nil {
297+
setupLog.Error(err, "unable to create controller", "controller", "CSRApproval")
298+
os.Exit(1)
299+
}
282300
// +kubebuilder:scaffold:builder
283301

284302
if metricsCertWatcher != nil {

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,11 @@ rules:
122122
- patch
123123
- update
124124
- watch
125+
- apiGroups:
126+
- provisioning.dpu.nvidia.com
127+
resources:
128+
- dpus
129+
verbs:
130+
- get
131+
- list
132+
- watch

helm/dpf-hcp-provisioner-operator/templates/clusterrole.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ rules:
113113
- update
114114
- watch
115115

116+
# DPU permissions (for CSR validation against DPU objects)
117+
- apiGroups:
118+
- provisioning.dpu.nvidia.com
119+
resources:
120+
- dpus
121+
verbs:
122+
- get
123+
- list
124+
- watch
125+
116126
# HyperShift HostedCluster and NodePool permissions
117127
- apiGroups:
118128
- hypershift.openshift.io

internal/common/constants.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ const (
2222
// If the CR is renamed, update this constant once and it propagates everywhere.
2323
DPFHCPProvisionerName = "dpfhcpprovisioner"
2424

25-
// ControllerName is the name used for event recorders
26-
ControllerName = "dpfhcpprovisioner-controller"
25+
// ProvisionerControllerName is the name for the main provisioner controller event recorder
26+
ProvisionerControllerName = "dpfhcpprovisioner-controller"
27+
28+
// CSRApprovalControllerName is the name for the CSR approval controller event recorder
29+
CSRApprovalControllerName = "csr-approval-controller"
2730
)
2831

2932
// Label keys for cross-namespace resource ownership tracking
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
Copyright 2025.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package csrapproval
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
"time"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/tools/clientcmd"
29+
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
31+
)
32+
33+
// ClientManager manages hosted cluster client lifecycle
34+
type ClientManager struct {
35+
mgmtClient client.Client
36+
// mu protects concurrent access to hcClients map
37+
// Multiple reconciliations can run concurrently, so we need to protect map access
38+
mu sync.RWMutex
39+
// hcClients caches Kubernetes clientsets for hosted clusters to avoid recreating them on every reconciliation.
40+
// Each DPFHCPProvisioner creates a hosted cluster with its own API server. This map stores one clientset
41+
// per hosted cluster (keyed by "namespace/name") to reuse connections and avoid expensive client creation
42+
// (parsing kubeconfig, establishing TCP connections) every 30 seconds during CSR polling.
43+
// Without this cache, we would create 120+ clients per hour per hosted cluster.
44+
hcClients map[string]*kubernetes.Clientset
45+
}
46+
47+
// NewClientManager creates a new client manager
48+
func NewClientManager(mgmtClient client.Client) *ClientManager {
49+
return &ClientManager{
50+
mgmtClient: mgmtClient,
51+
hcClients: make(map[string]*kubernetes.Clientset),
52+
}
53+
}
54+
55+
// GetHostedClusterClient retrieves or creates a client for the hosted cluster
56+
func (cm *ClientManager) GetHostedClusterClient(ctx context.Context, namespace, name string) (*kubernetes.Clientset, error) {
57+
key := namespace + "/" + name
58+
59+
// Check cache with read lock
60+
cm.mu.RLock()
61+
if clientset, ok := cm.hcClients[key]; ok {
62+
cm.mu.RUnlock()
63+
return clientset, nil
64+
}
65+
cm.mu.RUnlock()
66+
67+
// Create new client (outside lock to avoid holding lock during slow operation)
68+
clientset, err := cm.createHostedClusterClient(ctx, namespace, name)
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
// Cache the client with write lock
74+
cm.mu.Lock()
75+
cm.hcClients[key] = clientset
76+
cm.mu.Unlock()
77+
78+
return clientset, nil
79+
}
80+
81+
// InvalidateClient removes a cached client (useful when kubeconfig rotates)
82+
func (cm *ClientManager) InvalidateClient(namespace, name string) {
83+
key := namespace + "/" + name
84+
cm.mu.Lock()
85+
delete(cm.hcClients, key)
86+
cm.mu.Unlock()
87+
}
88+
89+
// createHostedClusterClient creates a Kubernetes client for the hosted cluster
90+
func (cm *ClientManager) createHostedClusterClient(ctx context.Context, namespace, name string) (*kubernetes.Clientset, error) {
91+
// Fetch kubeconfig secret
92+
kubeconfigData, err := cm.getKubeconfigData(ctx, namespace, name)
93+
if err != nil {
94+
return nil, fmt.Errorf("failed to get kubeconfig: %w", err)
95+
}
96+
97+
// Parse kubeconfig
98+
kubeconfig, err := clientcmd.Load(kubeconfigData)
99+
if err != nil {
100+
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
101+
}
102+
103+
// Replace external endpoint with internal service DNS name
104+
// The HyperShift admin-kubeconfig uses external endpoints (LoadBalancer IP or NodePort)
105+
// which are not accessible from inside the operator pod's network.
106+
// We need to use the internal service endpoint for in-cluster access.
107+
if err := replaceServerWithInternalEndpoint(ctx, cm.mgmtClient, kubeconfig, namespace, name); err != nil {
108+
return nil, fmt.Errorf("failed to replace server endpoint: %w", err)
109+
}
110+
111+
// Create REST config from modified kubeconfig
112+
config, err := clientcmd.NewDefaultClientConfig(*kubeconfig, &clientcmd.ConfigOverrides{}).ClientConfig()
113+
if err != nil {
114+
return nil, fmt.Errorf("failed to create rest config from kubeconfig: %w", err)
115+
}
116+
117+
// Set reasonable timeouts for CSR API operations
118+
// We use List/Get/UpdateApproval operations (not watches), so a 30s timeout is appropriate
119+
config.Timeout = 30 * time.Second
120+
config.QPS = 5
121+
config.Burst = 10
122+
123+
// Create clientset
124+
clientset, err := kubernetes.NewForConfig(config)
125+
if err != nil {
126+
return nil, fmt.Errorf("failed to create clientset: %w", err)
127+
}
128+
129+
return clientset, nil
130+
}
131+
132+
// getKubeconfigData retrieves the kubeconfig data from the admin secret
133+
func (cm *ClientManager) getKubeconfigData(ctx context.Context, namespace, name string) ([]byte, error) {
134+
// The kubeconfig secret name follows HyperShift convention: <hostedcluster-name>-admin-kubeconfig
135+
secretName := name + "-admin-kubeconfig"
136+
137+
secret := &corev1.Secret{}
138+
secretKey := types.NamespacedName{
139+
Namespace: namespace,
140+
Name: secretName,
141+
}
142+
143+
if err := cm.mgmtClient.Get(ctx, secretKey, secret); err != nil {
144+
return nil, fmt.Errorf("failed to get kubeconfig secret %s: %w", secretKey, err)
145+
}
146+
147+
kubeconfigData, ok := secret.Data["kubeconfig"]
148+
if !ok {
149+
return nil, fmt.Errorf("kubeconfig key not found in secret %s", secretKey)
150+
}
151+
152+
if len(kubeconfigData) == 0 {
153+
return nil, fmt.Errorf("kubeconfig data is empty in secret %s", secretKey)
154+
}
155+
156+
return kubeconfigData, nil
157+
}
158+
159+
// replaceServerWithInternalEndpoint modifies the kubeconfig to use internal service DNS name
160+
// instead of the external LoadBalancer IP or NodePort. This allows the operator pod (running inside the cluster)
161+
// to reach the hosted cluster API server via the internal Kubernetes service.
162+
//
163+
// HyperShift creates admin-kubeconfig with external endpoints:
164+
// - LoadBalancer: https://10.6.135.42:6443 (example external IP, not accessible from operator pod)
165+
// - NodePort: https://<node-ip>:31039 (example NodePort, dynamically allocated per cluster)
166+
//
167+
// This function replaces it with the internal service DNS name:
168+
// https://kube-apiserver.<namespace>-<name>.svc.cluster.local:6443
169+
//
170+
// Port 6443 is hardcoded to match HyperShift's implementation. HyperShift itself hardcodes
171+
// the kube-apiserver port as a constant (KASSVCPort = 6443) in their codebase.
172+
func replaceServerWithInternalEndpoint(ctx context.Context, mgmtClient client.Client, kubeconfig *clientcmdapi.Config, hostedClusterNamespace, hostedClusterName string) error {
173+
if kubeconfig == nil {
174+
return fmt.Errorf("kubeconfig is nil")
175+
}
176+
177+
// Find the current context
178+
currentContext := kubeconfig.CurrentContext
179+
if currentContext == "" {
180+
return fmt.Errorf("kubeconfig has no current context")
181+
}
182+
183+
context, ok := kubeconfig.Contexts[currentContext]
184+
if !ok {
185+
return fmt.Errorf("context %s not found in kubeconfig", currentContext)
186+
}
187+
188+
// Find the cluster referenced by the context
189+
clusterName := context.Cluster
190+
cluster, ok := kubeconfig.Clusters[clusterName]
191+
if !ok {
192+
return fmt.Errorf("cluster %s not found in kubeconfig", clusterName)
193+
}
194+
195+
// Construct the service namespace following HyperShift convention
196+
serviceNamespace := fmt.Sprintf("%s-%s", hostedClusterNamespace, hostedClusterName)
197+
198+
// Construct internal service DNS name with hardcoded port 6443 (matching HyperShift's approach)
199+
internalServer := fmt.Sprintf("https://kube-apiserver.%s.svc.cluster.local:6443", serviceNamespace)
200+
201+
// Replace the server URL
202+
cluster.Server = internalServer
203+
204+
return nil
205+
}
206+
207+
// TestConnection verifies the hosted cluster client can connect to the API server
208+
func TestConnection(ctx context.Context, clientset *kubernetes.Clientset) error {
209+
_, err := clientset.Discovery().ServerVersion()
210+
if err != nil {
211+
return fmt.Errorf("failed to connect to hosted cluster API server: %w", err)
212+
}
213+
return nil
214+
}

0 commit comments

Comments
 (0)