Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions internal/controller/nimpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (r *NIMPipelineReconciler) reconcileNIMService(ctx context.Context, nimPipe

// Sync NIMService with the desired spec
namespacedName := types.NamespacedName{Name: nimService.Name, Namespace: nimService.Namespace}
err := r.syncResource(ctx, namespacedName, nimService)
err := r.syncResource(ctx, namespacedName, nimPipeline, nimService)
if err != nil {
logger.Error(err, "Failed to sync NIMService", "name", nimService.Name)
return err
Expand All @@ -202,7 +202,7 @@ func (r *NIMPipelineReconciler) reconcileNIMService(ctx context.Context, nimPipe
return nil
}

func (r *NIMPipelineReconciler) syncResource(ctx context.Context, currentNamespacedName types.NamespacedName, desired *appsv1alpha1.NIMService) error {
func (r *NIMPipelineReconciler) syncResource(ctx context.Context, currentNamespacedName types.NamespacedName, nimPipeline *appsv1alpha1.NIMPipeline, desired *appsv1alpha1.NIMService) error {
logger := log.FromContext(ctx)

current := &appsv1alpha1.NIMService{}
Expand All @@ -224,7 +224,11 @@ func (r *NIMPipelineReconciler) syncResource(ctx context.Context, currentNamespa
return err
}
} else {
// Resource exists, so update it
// Resource exists, so update it only if the current nimservice is owned by the pipeline
if owned, _ := controllerutil.HasOwnerReference(current.GetOwnerReferences(), nimPipeline, r.Scheme); !owned {
return fmt.Errorf("NIMservice %s already exists and is not owned by the NIMPipeline %s", current.Name, nimPipeline.Name)
}

// Ensure the resource version is carried over to the desired object
desired.ResourceVersion = current.ResourceVersion

Expand All @@ -248,21 +252,13 @@ func (r *NIMPipelineReconciler) cleanupDisabledNIMs(ctx context.Context, nimPipe
var allErrors []error

for _, svc := range serviceList.Items {
owned := false
for _, ownerRef := range svc.GetOwnerReferences() {
if ownerRef.Kind == "NIMPipeline" && ownerRef.UID == nimPipeline.UID {
owned = true
break
}
}

// Ignore NIM services not owned by the NIM pipeline
if !owned {
if owned, _ := controllerutil.HasOwnerReference(svc.GetOwnerReferences(), nimPipeline, r.Scheme); !owned {
continue
}

// Cleanup any stale NIM services if they are part of the pipeline but are disabled
if enabled, exists := enabledServices[svc.Name]; exists && !enabled {
// Cleanup any stale NIM services if they were previously part of the pipeline but are removed/disabled
if enabled, exists := enabledServices[svc.Name]; !exists || !enabled {
if err := r.deleteService(ctx, &svc); err != nil {
logger.Error(err, "Unable to delete disabled NIM service", "Name", svc.Name)
allErrors = append(allErrors, fmt.Errorf("failed to delete service %s: %w", svc.Name, err))
Expand Down Expand Up @@ -297,15 +293,10 @@ func (r *NIMPipelineReconciler) updateStatus(ctx context.Context, nimPipeline *a
foundServices := make(map[string]bool)

for _, svc := range serviceList.Items {
owned := false
for _, ownerRef := range svc.GetOwnerReferences() {
if ownerRef.Kind == "NIMPipeline" && ownerRef.UID == nimPipeline.UID {
owned = true
break
}
if enabled, exists := enabledServices[svc.Name]; !exists || !enabled {
continue
}

if enabled, exists := enabledServices[svc.Name]; !owned || !exists || !enabled {
if owned, _ := controllerutil.HasOwnerReference(svc.GetOwnerReferences(), nimPipeline, r.Scheme); !owned {
continue
}

Expand Down
Loading