Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ const (
ReasonStatefulSetFailed = "StatefulsetFailed"
// ReasonSecretFailed indicates that the creation of secret has failed.
ReasonSecretFailed = "SecretFailed"
// ReasonNIMCacheFailed indicates that the NIMCache is in failed state.
ReasonNIMCacheFailed = "NIMCacheFailed"
// ReasonNIMCacheNotFound indicates that the NIMCache is not found.
ReasonNIMCacheNotFound = "NIMCacheNotFound"
// ReasonNIMCacheNotReady indicates that the NIMCache is not ready.
ReasonNIMCacheNotReady = "NIMCacheNotReady"
)

// Updater is the condition updater.
Expand Down
49 changes: 49 additions & 0 deletions internal/controller/nimservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand Down Expand Up @@ -216,6 +219,22 @@ func (r *NIMServiceReconciler) GetOrchestratorType(ctx context.Context) (k8sutil
// SetupWithManager sets up the controller with the Manager.
func (r *NIMServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.recorder = mgr.GetEventRecorderFor("nimservice-controller")
err := mgr.GetFieldIndexer().IndexField(
context.Background(),
&appsv1alpha1.NIMService{},
"spec.storage.nimCache.name",
func(rawObj client.Object) []string {
nimService, ok := rawObj.(*appsv1alpha1.NIMService)
if !ok {
return []string{}
}
return []string{nimService.Spec.Storage.NIMCache.Name}
},
)
if err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&appsv1alpha1.NIMService{}).
Owns(&appsv1.Deployment{}).
Expand Down Expand Up @@ -245,9 +264,39 @@ func (r *NIMServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return true
},
}).
Watches(
&appsv1alpha1.NIMCache{},
handler.EnqueueRequestsFromMapFunc(r.mapNIMCacheToNIMService),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
Complete(r)
}

func (r *NIMServiceReconciler) mapNIMCacheToNIMService(ctx context.Context, obj client.Object) []ctrl.Request {
nimCache, ok := obj.(*appsv1alpha1.NIMCache)
if !ok {
return []ctrl.Request{}
}

// Get all NIMServices that reference this NIMCache
var nimServices appsv1alpha1.NIMServiceList
if err := r.List(ctx, &nimServices, client.MatchingFields{"spec.storage.nimCache.name": nimCache.GetName()}, client.InNamespace(nimCache.GetNamespace())); err != nil {
return []ctrl.Request{}
}

// Enqueue reconciliation for each matching NIMService
requests := make([]ctrl.Request, len(nimServices.Items))
for i, item := range nimServices.Items {
requests[i] = ctrl.Request{
NamespacedName: types.NamespacedName{
Name: item.Name,
Namespace: item.Namespace,
},
}
}
return requests
}

func (r *NIMServiceReconciler) refreshMetrics(ctx context.Context) {
logger := log.FromContext(ctx)
// List all nodes
Expand Down
83 changes: 59 additions & 24 deletions internal/controller/platform/standalone/nimservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
apiResource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -179,16 +180,55 @@ func (r *NIMServiceReconciler) reconcileNIMService(ctx context.Context, nimServi
}
}

deploymentParams := nimService.GetDeploymentParams()
var modelPVC *appsv1alpha1.PersistentVolumeClaim
modelProfile := ""

deploymentParams.OrchestratorType = string(r.GetOrchestratorType())

// Select PVC for model store
if nimService.GetNIMCacheName() != "" { // nolint:gocritic
nimCacheName := nimService.GetNIMCacheName()
if nimCacheName != "" { // nolint:gocritic
nimCache := appsv1alpha1.NIMCache{}
if err := r.Get(ctx, types.NamespacedName{Name: nimCacheName, Namespace: nimService.GetNamespace()}, &nimCache); err != nil {
// Fail the NIMService if the NIMCache is not found
if errors.IsNotFound(err) {
msg := fmt.Sprintf("NIMCache %s not found", nimCacheName)
statusUpdateErr := r.updater.SetConditionsFailed(ctx, nimService, conditions.ReasonNIMCacheNotFound, msg)
r.GetEventRecorder().Eventf(nimService, corev1.EventTypeWarning, conditions.Failed, msg)
logger.Info(msg, "nimcache", nimCacheName, "nimservice", nimService.Name)
if statusUpdateErr != nil {
logger.Error(statusUpdateErr, "failed to update status", "nimservice", nimService.Name)
return ctrl.Result{}, statusUpdateErr
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

switch nimCache.Status.State {
case appsv1alpha1.NimCacheStatusReady:
logger.V(4).Info("NIMCache is ready", "nimcache", nimCacheName, "nimservice", nimService.Name)
case appsv1alpha1.NimCacheStatusFailed:
msg := r.getNIMCacheFailedMessage(&nimCache)
err = r.updater.SetConditionsFailed(ctx, nimService, conditions.ReasonNIMCacheFailed, msg)
r.GetEventRecorder().Eventf(nimService, corev1.EventTypeWarning, conditions.Failed, msg)
logger.Info(msg, "nimcache", nimCacheName, "nimservice", nimService.Name)
if err != nil {
logger.Error(err, "failed to update status", "nimservice", nimService.Name)
}
return ctrl.Result{}, err
default:
msg := fmt.Sprintf("NIMCache %s not ready", nimCacheName)
err = r.updater.SetConditionsNotReady(ctx, nimService, conditions.ReasonNIMCacheNotReady, msg)
r.GetEventRecorder().Eventf(nimService, corev1.EventTypeNormal, conditions.NotReady,
"NIMService %s not ready yet, msg: %s", nimService.Name, msg)
logger.V(4).Info(msg, "nimservice", nimService.Name)
if err != nil {
logger.Error(err, "failed to update status", "nimservice", nimService.Name)
}
return ctrl.Result{}, err
}

// Fetch PVC for the associated NIMCache instance and mount it
nimCachePVC, err := r.getNIMCachePVC(ctx, nimService)
nimCachePVC, err := r.getNIMCachePVC(&nimCache)
if err != nil {
logger.Error(err, "unable to obtain pvc backing the nimcache instance")
return ctrl.Result{}, err
Expand All @@ -215,6 +255,10 @@ func (r *NIMServiceReconciler) reconcileNIMService(ctx context.Context, nimServi
logger.Error(err, "failed to determine PVC for model-store")
return ctrl.Result{}, err
}

deploymentParams := nimService.GetDeploymentParams()
deploymentParams.OrchestratorType = string(r.GetOrchestratorType())

// Setup volume mounts with model store
deploymentParams.Volumes = nimService.GetVolumes(*modelPVC)
deploymentParams.VolumeMounts = nimService.GetVolumeMounts(*modelPVC)
Expand Down Expand Up @@ -537,26 +581,9 @@ func getDeploymentCondition(status appsv1.DeploymentStatus, condType appsv1.Depl
}

// getNIMCachePVC returns PVC backing the NIM cache instance.
func (r *NIMServiceReconciler) getNIMCachePVC(ctx context.Context, nimService *appsv1alpha1.NIMService) (*appsv1alpha1.PersistentVolumeClaim, error) {
logger := log.FromContext(ctx)

if nimService.GetNIMCacheName() == "" {
// NIM cache PVC is not used
return nil, nil
}
// Lookup NIMCache instance in the same namespace as the NIMService instance
nimCache := &appsv1alpha1.NIMCache{}
if err := r.Get(ctx, types.NamespacedName{Name: nimService.GetNIMCacheName(), Namespace: nimService.Namespace}, nimCache); err != nil {
logger.Error(err, "unable to fetch nimcache", "nimcache", nimService.GetNIMCacheName(), "nimservice", nimService.Name)
return nil, err
}
// Get the status of NIMCache
if nimCache.Status.State != appsv1alpha1.NimCacheStatusReady {
return nil, fmt.Errorf("nimcache %s is not ready, nimservice %s", nimCache.GetName(), nimService.GetName())
}

func (r *NIMServiceReconciler) getNIMCachePVC(nimCache *appsv1alpha1.NIMCache) (*appsv1alpha1.PersistentVolumeClaim, error) {
if nimCache.Status.PVC == "" {
return nil, fmt.Errorf("missing PVC for the nimcache instance %s, nimservice %s", nimCache.GetName(), nimService.GetName())
return nil, fmt.Errorf("missing PVC for the nimcache instance %s", nimCache.GetName())
}

if nimCache.Spec.Storage.PVC.Name == "" {
Expand All @@ -566,6 +593,14 @@ func (r *NIMServiceReconciler) getNIMCachePVC(ctx context.Context, nimService *a
return &nimCache.Spec.Storage.PVC, nil
}

func (r *NIMServiceReconciler) getNIMCacheFailedMessage(nimCache *appsv1alpha1.NIMCache) string {
cond := meta.FindStatusCondition(nimCache.Status.Conditions, conditions.Failed)
if cond != nil && cond.Status == metav1.ConditionTrue {
return cond.Message
}
return ""
}

func (r *NIMServiceReconciler) reconcilePVC(ctx context.Context, nimService *appsv1alpha1.NIMService) (*appsv1alpha1.PersistentVolumeClaim, error) {
logger := r.GetLogger()
pvcName := nimService.GetPVCName(nimService.Spec.Storage.PVC)
Expand Down
88 changes: 88 additions & 0 deletions internal/controller/platform/standalone/nimservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func sortVolumes(volumes []corev1.Volume) {
})
}

func getCondition(obj *appsv1alpha1.NIMService, conditionType string) *metav1.Condition {
for _, condition := range obj.Status.Conditions {
if condition.Type == conditionType {
return &condition
}
}
return nil
}

// Custom transport that redirects requests to a specific host.
type mockTransport struct {
targetHost string
Expand Down Expand Up @@ -611,6 +620,85 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() {

})

It("should be NotReady when nimcache is not ready", func() {
nimCache.Status = appsv1alpha1.NIMCacheStatus{
State: appsv1alpha1.NimCacheStatusNotReady,
}
Expect(client.Status().Update(context.TODO(), nimCache)).To(Succeed())
err := client.Create(context.TODO(), nimService)
Expect(err).NotTo(HaveOccurred())

result, err := reconciler.reconcileNIMService(context.TODO(), nimService)
Expect(err).NotTo(HaveOccurred())
Expect(result).To(Equal(ctrl.Result{}))

// Check that the NIMService is not ready.
namespacedName := types.NamespacedName{Name: nimService.Name, Namespace: nimService.Namespace}
obj := &appsv1alpha1.NIMService{}
err = client.Get(context.TODO(), namespacedName, obj)
Expect(err).NotTo(HaveOccurred())
Expect(obj.Status.State).To(Equal(appsv1alpha1.NIMServiceStatusNotReady))
readyCondition := getCondition(obj, conditions.Ready)
Expect(readyCondition).NotTo(BeNil())
Expect(readyCondition.Status).To(Equal(metav1.ConditionFalse))
Expect(readyCondition.Reason).To(Equal(conditions.ReasonNIMCacheNotReady))
})

It("should be Failed when nimcache is not found", func() {
testNimService := nimService.DeepCopy()
testNimService.Spec.Storage.NIMCache.Name = "invalid-nimcache"
err := client.Create(context.TODO(), testNimService)
Expect(err).NotTo(HaveOccurred())

result, err := reconciler.reconcileNIMService(context.TODO(), testNimService)
Expect(err).NotTo(HaveOccurred())
Expect(result).To(Equal(ctrl.Result{}))

// Check that the NIMService is in failed state.
namespacedName := types.NamespacedName{Name: testNimService.Name, Namespace: testNimService.Namespace}
obj := &appsv1alpha1.NIMService{}
err = client.Get(context.TODO(), namespacedName, obj)
Expect(err).NotTo(HaveOccurred())
Expect(obj.Status.State).To(Equal(appsv1alpha1.NIMServiceStatusFailed))
failedCondition := getCondition(obj, conditions.Failed)
Expect(failedCondition).NotTo(BeNil())
Expect(failedCondition.Status).To(Equal(metav1.ConditionTrue))
Expect(failedCondition.Reason).To(Equal(conditions.ReasonNIMCacheNotFound))
})

It("should be Failed when nimcache is in failed state", func() {
nimCache.Status = appsv1alpha1.NIMCacheStatus{
State: appsv1alpha1.NimCacheStatusFailed,
Conditions: []metav1.Condition{
{
Type: conditions.Failed,
Status: metav1.ConditionTrue,
Reason: conditions.Failed,
Message: "NIMCache failed",
},
},
}
Expect(client.Status().Update(context.TODO(), nimCache)).To(Succeed())

err := client.Create(context.TODO(), nimService)
Expect(err).NotTo(HaveOccurred())

result, err := reconciler.reconcileNIMService(context.TODO(), nimService)
Expect(err).NotTo(HaveOccurred())
Expect(result).To(Equal(ctrl.Result{}))

// Check that the NIMService is in failed state.
namespacedName := types.NamespacedName{Name: nimService.Name, Namespace: nimService.Namespace}
obj := &appsv1alpha1.NIMService{}
err = client.Get(context.TODO(), namespacedName, obj)
Expect(err).NotTo(HaveOccurred())
Expect(obj.Status.State).To(Equal(appsv1alpha1.NIMServiceStatusFailed))
failedCondition := getCondition(obj, conditions.Failed)
Expect(failedCondition).NotTo(BeNil())
Expect(failedCondition.Status).To(Equal(metav1.ConditionTrue))
Expect(failedCondition.Reason).To(Equal(conditions.ReasonNIMCacheFailed))
})

Describe("isDeploymentReady for setting status on NIMService", func() {
AfterEach(func() {
// Clean up the Deployment instance
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/platform/standalone/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (s *Standalone) Sync(ctx context.Context, r shared.Reconciler, resource cli
logger.Info("Reconciling NIMService instance", "nimservice", nimService.GetName())
result, err := reconciler.reconcileNIMService(ctx, nimService)
if err != nil {
if errors.IsConflict(err) {
// Ignore conflict errors and retry.
return ctrl.Result{Requeue: true}, nil
}
r.GetEventRecorder().Eventf(nimService, corev1.EventTypeWarning, "ReconcileFailed",
"NIMService %s failed, msg: %s", nimService.Name, err.Error())
errConditionUpdate := reconciler.updater.SetConditionsFailed(ctx, nimService, conditions.Failed, err.Error())
Expand Down
Loading