Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions internal/controller/nimpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -193,7 +194,7 @@ func (r *NIMPipelineReconciler) reconcileNIMService(ctx context.Context, nimPipe

// Sync NIMService with the desired spec
namespacedName := types.NamespacedName{Name: nimService.Name, Namespace: nimService.Namespace}
err := r.syncResource(ctx, namespacedName, nimService)
err := r.syncResource(ctx, namespacedName, nimPipeline, nimService)
if err != nil {
logger.Error(err, "Failed to sync NIMService", "name", nimService.Name)
return err
Expand All @@ -202,7 +203,7 @@ func (r *NIMPipelineReconciler) reconcileNIMService(ctx context.Context, nimPipe
return nil
}

func (r *NIMPipelineReconciler) syncResource(ctx context.Context, currentNamespacedName types.NamespacedName, desired *appsv1alpha1.NIMService) error {
func (r *NIMPipelineReconciler) syncResource(ctx context.Context, currentNamespacedName types.NamespacedName, nimPipeline *appsv1alpha1.NIMPipeline, desired *appsv1alpha1.NIMService) error {
logger := log.FromContext(ctx)

current := &appsv1alpha1.NIMService{}
Expand All @@ -224,7 +225,11 @@ func (r *NIMPipelineReconciler) syncResource(ctx context.Context, currentNamespa
return err
}
} else {
// Resource exists, so update it
// Resource exists, so update it only if the current nimservice is owned by the pipeline
if owned, _ := controllerutil.HasOwnerReference(current.GetOwnerReferences(), nimPipeline, r.Scheme); !owned {
return fmt.Errorf("NIMservice %s already exists and is not owned by the NIMPipeline %s", current.Name, nimPipeline.Name)
}

// Ensure the resource version is carried over to the desired object
desired.ResourceVersion = current.ResourceVersion

Expand All @@ -248,21 +253,13 @@ func (r *NIMPipelineReconciler) cleanupDisabledNIMs(ctx context.Context, nimPipe
var allErrors []error

for _, svc := range serviceList.Items {
owned := false
for _, ownerRef := range svc.GetOwnerReferences() {
if ownerRef.Kind == "NIMPipeline" && ownerRef.UID == nimPipeline.UID {
owned = true
break
}
}

// Ignore NIM services not owned by the NIM pipeline
if !owned {
if owned, _ := controllerutil.HasOwnerReference(svc.GetOwnerReferences(), nimPipeline, r.Scheme); !owned {
continue
}

// Cleanup any stale NIM services if they are part of the pipeline but are disabled
if enabled, exists := enabledServices[svc.Name]; exists && !enabled {
// Cleanup any stale NIM services if they were previously part of the pipeline but are removed/disabled
if enabled, exists := enabledServices[svc.Name]; !exists || !enabled {
if err := r.deleteService(ctx, &svc); err != nil {
logger.Error(err, "Unable to delete disabled NIM service", "Name", svc.Name)
allErrors = append(allErrors, fmt.Errorf("failed to delete service %s: %w", svc.Name, err))
Expand Down Expand Up @@ -297,15 +294,10 @@ func (r *NIMPipelineReconciler) updateStatus(ctx context.Context, nimPipeline *a
foundServices := make(map[string]bool)

for _, svc := range serviceList.Items {
owned := false
for _, ownerRef := range svc.GetOwnerReferences() {
if ownerRef.Kind == "NIMPipeline" && ownerRef.UID == nimPipeline.UID {
owned = true
break
}
if enabled, exists := enabledServices[svc.Name]; !exists || !enabled {
continue
}

if enabled, exists := enabledServices[svc.Name]; !owned || !exists || !enabled {
if owned, _ := controllerutil.HasOwnerReference(svc.GetOwnerReferences(), nimPipeline, r.Scheme); !owned {
continue
}

Expand All @@ -316,19 +308,21 @@ func (r *NIMPipelineReconciler) updateStatus(ctx context.Context, nimPipeline *a
serviceStates[svc.Name] = svc.Status.State

switch svc.Status.State {
case appsv1alpha1.NIMServiceStatusReady:
// Leave the overall status as is
case appsv1alpha1.NIMServiceStatusFailed:
// If any service has failed, set the overall state to "Failed"
overallState = appsv1alpha1.NIMServiceStatusFailed
allServicesReady = false
case appsv1alpha1.NIMServiceStatusNotReady, appsv1alpha1.NIMServiceStatusPending:
// If any service is not ready or pending, set overall readiness to false
default:
// If any service is not ready, set overall readiness to false
allServicesReady = false
}
}

// Check if any enabled services are missing
for serviceName := range enabledServices {
if !foundServices[serviceName] {
for serviceName, enabled := range enabledServices {
if enabled && !foundServices[serviceName] {
// A required service is missing, mark as "NotReady"
allServicesReady = false
serviceStates[serviceName] = appsv1alpha1.NIMServiceStatusNotReady
Expand All @@ -347,18 +341,26 @@ func (r *NIMPipelineReconciler) updateStatus(ctx context.Context, nimPipeline *a
r.GetEventRecorder().Eventf(nimPipeline, corev1.EventTypeNormal, overallState,
"NIMPipeline %s status %s, service states %v", nimPipeline.Name, overallState, serviceStates)

if err := r.Status().Update(ctx, nimPipeline); err != nil {
logger.Error(err, "Failed to update NIMPipeline status")
return err
}

return nil
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
obj := &appsv1alpha1.NIMPipeline{}
errGet := r.Get(ctx, types.NamespacedName{Name: nimPipeline.Name, Namespace: nimPipeline.GetNamespace()}, obj)
if errGet != nil {
logger.Error(errGet, "error getting NIMPipeline", "name", nimPipeline.Name)
return errGet
}
obj.Status = nimPipeline.Status
if err := r.Status().Update(ctx, obj); err != nil {
logger.Error(err, "Failed to update status", "NIMPipeline", nimPipeline.Name)
return err
}
return nil
})
}

func (r *NIMPipelineReconciler) deleteService(ctx context.Context, svc *appsv1alpha1.NIMService) error {
logger := log.FromContext(ctx)
logger.Info("Deleting NIMService", "name", svc.Name, "namespace", svc.Namespace)
if err := r.Delete(ctx, svc); err != nil {
if err := r.Delete(ctx, svc); err != nil && !errors.IsNotFound(err) {
logger.Error(err, "Failed to delete NIMService", "name", svc.Name)
return err
}
Expand Down
1 change: 1 addition & 0 deletions internal/controller/nimpipeline_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ var _ = Describe("NIMPipeline Controller", func() {
}, time.Second*5, time.Millisecond*500).Should(BeTrue())

// Disable nim-llm-service in the pipeline spec
Expect(client.Get(context.TODO(), namespacedName, updatePipeline)).To(Succeed())
updatePipeline.Spec.Services[0].Enabled = ptr.To(false)
Expect(client.Update(ctx, updatePipeline)).To(Succeed())

Expand Down
Loading