Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/v1alpha1/dpfhcpprovisioner_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ const (
// HostedClusterCleanup indicates the status of HostedCluster deletion during finalizer cleanup.
HostedClusterCleanup string = "HostedClusterCleanup"

// CSRAutoApprovalActive indicates whether CSR auto-approval is active and watching for CSRs
CSRAutoApprovalActive string = "CSRAutoApprovalActive"

// Validation conditions.

// SecretsValid indicates whether required secrets (pull secret, SSH key) are valid.
Expand Down Expand Up @@ -240,6 +243,19 @@ const (
ReasonKubeConfigInjectionFailed string = "InjectionFailed"
)

// Condition reasons for DPFHCPProvisioner CSRAutoApprovalActive status.
// These are used as the Reason field in the CSRAutoApprovalActive condition.
const (
// ReasonCSRApprovalActive indicates CSR auto-approval is actively processing CSRs
ReasonCSRApprovalActive string = "Active"

// ReasonKubeconfigNotAvailable indicates the kubeconfig is not available
ReasonKubeconfigNotAvailable string = "KubeconfigNotAvailable"

// ReasonHostedClusterNotReachable indicates the hosted cluster is not reachable
ReasonHostedClusterNotReachable string = "HostedClusterNotReachable"
)

// DPFHCPProvisionerStatus defines the observed state of DPFHCPProvisioner
type DPFHCPProvisionerStatus struct {
// Phase represents the current lifecycle phase
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 29 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/common"
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller"
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/bluefield"
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/csrapproval"
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/dpucluster"
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/finalizer"
"github.com/rh-ecosystem-edge/dpf-hcp-provisioner-operator/internal/controller/hostedcluster"
Expand Down Expand Up @@ -217,17 +218,20 @@ func main() {
}

client := mgr.GetClient()
recorder := mgr.GetEventRecorderFor(common.ControllerName)
scheme := mgr.GetScheme()

// Create event recorders for each controller/component
provisionerRecorder := mgr.GetEventRecorderFor(common.ProvisionerControllerName)
csrApprovalRecorder := mgr.GetEventRecorderFor(common.CSRApprovalControllerName)

// Initialize BlueField Image Resolver
imageResolver := bluefield.NewImageResolver(client, recorder)
imageResolver := bluefield.NewImageResolver(client, provisionerRecorder)

// Initialize DPUCluster Validator
dpuClusterValidator := dpucluster.NewValidator(client, recorder)
dpuClusterValidator := dpucluster.NewValidator(client, provisionerRecorder)

// Initialize Secrets Validator
secretsValidator := secrets.NewValidator(client, recorder)
secretsValidator := secrets.NewValidator(client, provisionerRecorder)

// Initialize Secret Manager for HostedCluster lifecycle
secretManager := hostedcluster.NewSecretManager(client, scheme)
Expand All @@ -239,32 +243,36 @@ func main() {
nodePoolManager := hostedcluster.NewNodePoolManager(client, scheme)

// Initialize Kubeconfig Injector
kubeconfigInjector := kubeconfiginjection.NewKubeconfigInjector(client, recorder)
kubeconfigInjector := kubeconfiginjection.NewKubeconfigInjector(client, provisionerRecorder)

// Initialize MetalLB Manager
metalLBManager := metallb.NewMetalLBManager(client, recorder)
metalLBManager := metallb.NewMetalLBManager(client, provisionerRecorder)

// Initialize CSR Approver
csrApprover := csrapproval.NewCSRApprover(client, csrApprovalRecorder)

// Initialize Finalizer Manager with pluggable cleanup handlers
// Handlers are executed in registration order
finalizerManager := finalizer.NewManager(client, recorder)
finalizerManager := finalizer.NewManager(client, provisionerRecorder)

// Register cleanup handlers in order (dependent resources first, dependencies last)
// 1. Kubeconfig injection cleanup (removes kubeconfig from DPUCluster namespace)
finalizerManager.RegisterHandler(kubeconfiginjection.NewCleanupHandler(client, recorder))
finalizerManager.RegisterHandler(kubeconfiginjection.NewCleanupHandler(client, provisionerRecorder))
// 2. HostedCluster cleanup (removes HostedCluster, NodePool, services, and secrets)
// Must run before MetalLB cleanup because LoadBalancer services depend on IPAddressPool
finalizerManager.RegisterHandler(hostedcluster.NewCleanupHandler(client, recorder))
finalizerManager.RegisterHandler(hostedcluster.NewCleanupHandler(client, provisionerRecorder))
// 3. MetalLB cleanup (removes IPAddressPool and L2Advertisement)
// Must run after HostedCluster cleanup to avoid deleting IPs while services still exist
finalizerManager.RegisterHandler(metallb.NewCleanupHandler(client, recorder))
finalizerManager.RegisterHandler(metallb.NewCleanupHandler(client, provisionerRecorder))

// Initialize Status Syncer for HostedCluster status mirroring
statusSyncer := hostedcluster.NewStatusSyncer(client)

// Setup main DPFHCPProvisioner controller
if err := (&controller.DPFHCPProvisionerReconciler{
Client: client,
Scheme: scheme,
Recorder: recorder,
Recorder: provisionerRecorder,
ImageResolver: imageResolver,
DPUClusterValidator: dpuClusterValidator,
SecretsValidator: secretsValidator,
Expand All @@ -279,6 +287,16 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "DPFHCPProvisioner")
os.Exit(1)
}

// Setup CSR Approval controller (separate from main controller)
if err := (&csrapproval.CSRApprovalReconciler{
Client: client,
Scheme: scheme,
Approver: csrApprover,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CSRApproval")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

if metricsCertWatcher != nil {
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,11 @@ rules:
- patch
- update
- watch
- apiGroups:
- provisioning.dpu.nvidia.com
resources:
- dpus
verbs:
- get
- list
- watch
10 changes: 10 additions & 0 deletions helm/dpf-hcp-provisioner-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ rules:
- update
- watch

# DPU permissions (for CSR validation against DPU objects)
- apiGroups:
- provisioning.dpu.nvidia.com
resources:
- dpus
verbs:
- get
- list
- watch

# HyperShift HostedCluster and NodePool permissions
- apiGroups:
- hypershift.openshift.io
Expand Down
7 changes: 5 additions & 2 deletions internal/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ const (
// If the CR is renamed, update this constant once and it propagates everywhere.
DPFHCPProvisionerName = "dpfhcpprovisioner"

// ControllerName is the name used for event recorders
ControllerName = "dpfhcpprovisioner-controller"
// ProvisionerControllerName is the name for the main provisioner controller event recorder
ProvisionerControllerName = "dpfhcpprovisioner-controller"

// CSRApprovalControllerName is the name for the CSR approval controller event recorder
CSRApprovalControllerName = "csr-approval-controller"
)

// Label keys for cross-namespace resource ownership tracking
Expand Down
214 changes: 214 additions & 0 deletions internal/controller/csrapproval/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
Copyright 2025.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package csrapproval

import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ClientManager manages hosted cluster client lifecycle
type ClientManager struct {
mgmtClient client.Client
// mu protects concurrent access to hcClients map
// Multiple reconciliations can run concurrently, so we need to protect map access
mu sync.RWMutex
// hcClients caches Kubernetes clientsets for hosted clusters to avoid recreating them on every reconciliation.
// Each DPFHCPProvisioner creates a hosted cluster with its own API server. This map stores one clientset
// per hosted cluster (keyed by "namespace/name") to reuse connections and avoid expensive client creation
// (parsing kubeconfig, establishing TCP connections) every 30 seconds during CSR polling.
// Without this cache, we would create 120+ clients per hour per hosted cluster.
hcClients map[string]*kubernetes.Clientset
}

// NewClientManager creates a new client manager
func NewClientManager(mgmtClient client.Client) *ClientManager {
return &ClientManager{
mgmtClient: mgmtClient,
hcClients: make(map[string]*kubernetes.Clientset),
}
}

// GetHostedClusterClient retrieves or creates a client for the hosted cluster
func (cm *ClientManager) GetHostedClusterClient(ctx context.Context, namespace, name string) (*kubernetes.Clientset, error) {
key := namespace + "/" + name

// Check cache with read lock
cm.mu.RLock()
if clientset, ok := cm.hcClients[key]; ok {
cm.mu.RUnlock()
return clientset, nil
}
cm.mu.RUnlock()

// Create new client (outside lock to avoid holding lock during slow operation)
clientset, err := cm.createHostedClusterClient(ctx, namespace, name)
if err != nil {
return nil, err
}

// Cache the client with write lock
cm.mu.Lock()
cm.hcClients[key] = clientset
cm.mu.Unlock()

return clientset, nil
}

// InvalidateClient removes a cached client (useful when kubeconfig rotates)
func (cm *ClientManager) InvalidateClient(namespace, name string) {
key := namespace + "/" + name
cm.mu.Lock()
delete(cm.hcClients, key)
cm.mu.Unlock()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a guard condition here in case the key is not in the map?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Jason, delete is safe. ref - https://go.dev/ref/spec#Deletion_of_map_elements

}

// createHostedClusterClient creates a Kubernetes client for the hosted cluster
func (cm *ClientManager) createHostedClusterClient(ctx context.Context, namespace, name string) (*kubernetes.Clientset, error) {
// Fetch kubeconfig secret
kubeconfigData, err := cm.getKubeconfigData(ctx, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get kubeconfig: %w", err)
}

// Parse kubeconfig
kubeconfig, err := clientcmd.Load(kubeconfigData)
if err != nil {
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
}

// Replace external endpoint with internal service DNS name
// The HyperShift admin-kubeconfig uses external endpoints (LoadBalancer IP or NodePort)
// which are not accessible from inside the operator pod's network.
// We need to use the internal service endpoint for in-cluster access.
if err := replaceServerWithInternalEndpoint(kubeconfig, namespace, name); err != nil {
return nil, fmt.Errorf("failed to replace server endpoint: %w", err)
}

// Create REST config from modified kubeconfig
config, err := clientcmd.NewDefaultClientConfig(*kubeconfig, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to create rest config from kubeconfig: %w", err)
}

// Set reasonable timeouts for CSR API operations
// We use List/Get/UpdateApproval operations (not watches), so a 30s timeout is appropriate
config.Timeout = 30 * time.Second
config.QPS = 5
config.Burst = 10

// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %w", err)
}

return clientset, nil
}

// getKubeconfigData retrieves the kubeconfig data from the admin secret
func (cm *ClientManager) getKubeconfigData(ctx context.Context, namespace, name string) ([]byte, error) {
// The kubeconfig secret name follows HyperShift convention: <hostedcluster-name>-admin-kubeconfig
secretName := name + "-admin-kubeconfig"

secret := &corev1.Secret{}
secretKey := types.NamespacedName{
Namespace: namespace,
Name: secretName,
}

if err := cm.mgmtClient.Get(ctx, secretKey, secret); err != nil {
return nil, fmt.Errorf("failed to get kubeconfig secret %s: %w", secretKey, err)
}

kubeconfigData, ok := secret.Data["kubeconfig"]
if !ok {
return nil, fmt.Errorf("kubeconfig key not found in secret %s", secretKey)
}

if len(kubeconfigData) == 0 {
return nil, fmt.Errorf("kubeconfig data is empty in secret %s", secretKey)
}

return kubeconfigData, nil
}

// replaceServerWithInternalEndpoint modifies the kubeconfig to use internal service DNS name
// instead of the external LoadBalancer IP or NodePort. This allows the operator pod (running inside the cluster)
// to reach the hosted cluster API server via the internal Kubernetes service.
//
// HyperShift creates admin-kubeconfig with external endpoints:
// - LoadBalancer: https://10.6.135.42:6443 (example external IP, not accessible from operator pod)
// - NodePort: https://<node-ip>:31039 (example NodePort, dynamically allocated per cluster)
//
// This function replaces it with the internal service DNS name:
// https://kube-apiserver.<namespace>-<name>.svc.cluster.local:6443
//
// Port 6443 is hardcoded to match HyperShift's implementation. HyperShift itself hardcodes
// the kube-apiserver port as a constant (KASSVCPort = 6443) in their codebase.
func replaceServerWithInternalEndpoint(kubeconfig *clientcmdapi.Config, hostedClusterNamespace, hostedClusterName string) error {
if kubeconfig == nil {
return fmt.Errorf("kubeconfig is nil")
}

// Find the current context
currentContext := kubeconfig.CurrentContext
if currentContext == "" {
return fmt.Errorf("kubeconfig has no current context")
}

ctxConfig, ok := kubeconfig.Contexts[currentContext]
if !ok {
return fmt.Errorf("context %s not found in kubeconfig", currentContext)
}

// Find the cluster referenced by the context
clusterName := ctxConfig.Cluster
cluster, ok := kubeconfig.Clusters[clusterName]
if !ok {
return fmt.Errorf("cluster %s not found in kubeconfig", clusterName)
}

// Construct the service namespace following HyperShift convention
serviceNamespace := fmt.Sprintf("%s-%s", hostedClusterNamespace, hostedClusterName)

// Construct internal service DNS name with hardcoded port 6443 (matching HyperShift's approach)
internalServer := fmt.Sprintf("https://kube-apiserver.%s.svc.cluster.local:6443", serviceNamespace)

// Replace the server URL
cluster.Server = internalServer

return nil
}

// TestConnection verifies the hosted cluster client can connect to the API server
func TestConnection(ctx context.Context, clientset *kubernetes.Clientset) error {
_, err := clientset.Discovery().ServerVersion()
if err != nil {
return fmt.Errorf("failed to connect to hosted cluster API server: %w", err)
}
return nil
}
Comment thread
linoyaslan marked this conversation as resolved.
Loading