diff --git a/api/apps/v1alpha1/nimservice_types.go b/api/apps/v1alpha1/nimservice_types.go index 3395b8546..d49525aa3 100644 --- a/api/apps/v1alpha1/nimservice_types.go +++ b/api/apps/v1alpha1/nimservice_types.go @@ -92,6 +92,14 @@ type NIMServiceStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty"` AvailableReplicas int32 `json:"availableReplicas,omitempty"` State string `json:"state,omitempty"` + Model *ModelStatus `json:"model,omitempty"` +} + +// ModelStatus defines the configuration of the NIMService model. +type ModelStatus struct { + Name string `json:"name"` + ClusterEndpoint string `json:"clusterEndpoint"` + ExternalEndpoint string `json:"externalEndpoint"` } // +genclient diff --git a/api/apps/v1alpha1/zz_generated.deepcopy.go b/api/apps/v1alpha1/zz_generated.deepcopy.go index db35cd7c2..8c8d63895 100644 --- a/api/apps/v1alpha1/zz_generated.deepcopy.go +++ b/api/apps/v1alpha1/zz_generated.deepcopy.go @@ -406,6 +406,21 @@ func (in *ModelSpec) DeepCopy() *ModelSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ModelStatus) DeepCopyInto(out *ModelStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelStatus. +func (in *ModelStatus) DeepCopy() *ModelStatus { + if in == nil { + return nil + } + out := new(ModelStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NGCSource) DeepCopyInto(out *NGCSource) { *out = *in @@ -915,6 +930,11 @@ func (in *NIMServiceStatus) DeepCopyInto(out *NIMServiceStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Model != nil { + in, out := &in.Model, &out.Model + *out = new(ModelStatus) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NIMServiceStatus. diff --git a/bundle/manifests/apps.nvidia.com_nimservices.yaml b/bundle/manifests/apps.nvidia.com_nimservices.yaml index ab3be5c15..207309600 100644 --- a/bundle/manifests/apps.nvidia.com_nimservices.yaml +++ b/bundle/manifests/apps.nvidia.com_nimservices.yaml @@ -2284,6 +2284,21 @@ spec: - type type: object type: array + model: + description: ModelStatus defines the configuration of the NIMService + model. + properties: + clusterEndpoint: + type: string + externalEndpoint: + type: string + name: + type: string + required: + - clusterEndpoint + - externalEndpoint + - name + type: object state: type: string type: object diff --git a/config/crd/bases/apps.nvidia.com_nimservices.yaml b/config/crd/bases/apps.nvidia.com_nimservices.yaml index ab3be5c15..207309600 100644 --- a/config/crd/bases/apps.nvidia.com_nimservices.yaml +++ b/config/crd/bases/apps.nvidia.com_nimservices.yaml @@ -2284,6 +2284,21 @@ spec: - type type: object type: array + model: + description: ModelStatus defines the configuration of the NIMService + model. + properties: + clusterEndpoint: + type: string + externalEndpoint: + type: string + name: + type: string + required: + - clusterEndpoint + - externalEndpoint + - name + type: object state: type: string type: object diff --git a/deployments/helm/k8s-nim-operator/crds/apps.nvidia.com_nimservices.yaml b/deployments/helm/k8s-nim-operator/crds/apps.nvidia.com_nimservices.yaml index ab3be5c15..207309600 100644 --- a/deployments/helm/k8s-nim-operator/crds/apps.nvidia.com_nimservices.yaml +++ b/deployments/helm/k8s-nim-operator/crds/apps.nvidia.com_nimservices.yaml @@ -2284,6 +2284,21 @@ spec: - type type: object type: array + model: + description: ModelStatus defines the configuration of the NIMService + model. + properties: + clusterEndpoint: + type: string + externalEndpoint: + type: string + name: + type: string + required: + - clusterEndpoint + - externalEndpoint + - name + type: object state: type: string type: object diff --git a/internal/controller/platform/standalone/nimservice.go b/internal/controller/platform/standalone/nimservice.go index 9510b9dee..ccddc8a2d 100644 --- a/internal/controller/platform/standalone/nimservice.go +++ b/internal/controller/platform/standalone/nimservice.go @@ -20,9 +20,11 @@ import ( "context" "fmt" + "github.com/NVIDIA/k8s-nim-operator/api/apps/v1alpha1" appsv1alpha1 "github.com/NVIDIA/k8s-nim-operator/api/apps/v1alpha1" "github.com/NVIDIA/k8s-nim-operator/internal/conditions" "github.com/NVIDIA/k8s-nim-operator/internal/k8sutil" + "github.com/NVIDIA/k8s-nim-operator/internal/nimmodels" "github.com/NVIDIA/k8s-nim-operator/internal/render" rendertypes "github.com/NVIDIA/k8s-nim-operator/internal/render/types" "github.com/NVIDIA/k8s-nim-operator/internal/shared" @@ -261,6 +263,12 @@ func (r *NIMServiceReconciler) reconcileNIMService(ctx context.Context, nimServi r.GetEventRecorder().Eventf(nimService, corev1.EventTypeNormal, conditions.NotReady, "NIMService %s not ready yet, msg: %s", nimService.Name, msg) } else { + // Update NIMServiceStatus with model config. + err = r.updateModelStatus(ctx, nimService) + if err != nil { + return ctrl.Result{}, err + } + // Update status as ready err = r.updater.SetConditionsReady(ctx, nimService, conditions.Ready, msg) r.GetEventRecorder().Eventf(nimService, corev1.EventTypeNormal, conditions.Ready, @@ -275,6 +283,87 @@ func (r *NIMServiceReconciler) reconcileNIMService(ctx context.Context, nimServi return ctrl.Result{}, nil } +func (r *NIMServiceReconciler) updateModelStatus(ctx context.Context, nimService *v1alpha1.NIMService) error { + clusterEndpoint, externalEndpoint, err := r.getNIMModelEndpoints(ctx, nimService) + if err != nil { + return err + } + modelName, err := r.getNIMModelName(ctx, clusterEndpoint) + if err != nil { + return err + } + nimService.Status.Model = &v1alpha1.ModelStatus{ + Name: modelName, + ClusterEndpoint: clusterEndpoint, + ExternalEndpoint: externalEndpoint, + } + + return nil +} + +func (r *NIMServiceReconciler) getNIMModelName(ctx context.Context, nimServiceEndpoint string) (string, error) { + modelsList, err := nimmodels.ListModelsV1(ctx, nimServiceEndpoint) + if err != nil { + return "", err + } + + if modelsList.Object == nimmodels.ObjectTypeList { + for _, model := range modelsList.Data { + if model.Object != nimmodels.ObjectTypeModel { + continue + } + if model.Root == nil || *model.Root != model.Id { + continue + } + return model.Id, nil + } + } + + return "", fmt.Errorf("failed to detect model name from nimservice endpoint '%s'", nimServiceEndpoint) +} + +func (r *NIMServiceReconciler) getNIMModelEndpoints(ctx context.Context, nimService *appsv1alpha1.NIMService) (string, string, error) { + logger := log.FromContext(ctx) + + // Lookup NIMCache instance in the same namespace as the NIMService instance + svc := &corev1.Service{} + if err := r.Get(ctx, types.NamespacedName{Name: nimService.GetName(), Namespace: nimService.GetNamespace()}, svc); err != nil { + logger.Error(err, "unable to fetch k8s service", "nimservice", nimService.GetName()) + return "", "", err + } + + var externalEndpoint string + port := nimService.GetServicePort() + if nimService.IsIngressEnabled() { + ingress := &networkingv1.Ingress{} + if err := r.Get(ctx, types.NamespacedName{Name: nimService.GetName(), Namespace: nimService.GetNamespace()}, ingress); err != nil { + logger.Error(err, "unable to fetch ingress", "nimservice", nimService.GetName()) + return "", "", err + } + + var found bool + for _, rule := range ingress.Spec.Rules { + if rule.HTTP == nil { + continue + } + for _, path := range rule.HTTP.Paths { + if path.Backend.Service != nil && path.Backend.Service.Name == nimService.GetName() { + externalEndpoint = rule.Host + found = true + break + } + } + if found { + break + } + } + } else if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + externalEndpoint = utils.FormatEndpoint(svc.Spec.LoadBalancerIP, port) + } + + return utils.FormatEndpoint(svc.Spec.ClusterIP, port), externalEndpoint, nil +} + func (r *NIMServiceReconciler) renderAndSyncResource(ctx context.Context, nimService *appsv1alpha1.NIMService, renderer *render.Renderer, obj client.Object, renderFunc func() (client.Object, error), conditionType string, reason string) error { logger := log.FromContext(ctx) diff --git a/internal/controller/platform/standalone/nimservice_test.go b/internal/controller/platform/standalone/nimservice_test.go index 6fba8a02b..82eee0c50 100644 --- a/internal/controller/platform/standalone/nimservice_test.go +++ b/internal/controller/platform/standalone/nimservice_test.go @@ -21,6 +21,9 @@ import ( "context" "fmt" "log" + "net/http" + "net/http/httptest" + "net/url" "path" "sort" "strings" @@ -71,15 +74,47 @@ func sortVolumes(volumes []corev1.Volume) { }) } +// Custom transport that redirects requests to a specific host +type mockTransport struct { + targetHost string + testServer *httptest.Server + originalTransport http.RoundTripper +} + +func (m *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + // Check if this request is going to our target IP + hostname := strings.Split(req.URL.Host, ":")[0] + if hostname == "" || req.URL.Host == m.targetHost { + // Create a new URL pointing to our test server + testURL, _ := url.Parse(m.testServer.URL) + testURL.Path = req.URL.Path + testURL.RawQuery = req.URL.RawQuery + + // Create a new request to our test server + newReq := req.Clone(req.Context()) + newReq.URL = testURL + newReq.Host = req.URL.Host // Preserve the original Host header + + // Send the request to our test server + return http.DefaultClient.Do(newReq) + } + + // For all other requests, use the original transport + return m.originalTransport.RoundTrip(req) +} + var _ = Describe("NIMServiceReconciler for a standalone platform", func() { var ( - client client.Client - reconciler *NIMServiceReconciler - scheme *runtime.Scheme - nimService *appsv1alpha1.NIMService - nimCache *appsv1alpha1.NIMCache - volumeMounts []corev1.VolumeMount - volumes []corev1.Volume + client client.Client + reconciler *NIMServiceReconciler + scheme *runtime.Scheme + nimService *appsv1alpha1.NIMService + nimCache *appsv1alpha1.NIMCache + volumeMounts []corev1.VolumeMount + volumes []corev1.Volume + testServerHandler http.HandlerFunc + testServer *httptest.Server + originalTransport = http.DefaultTransport ) BeforeEach(func() { scheme = runtime.NewScheme() @@ -352,9 +387,23 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { log.SetOutput(os.Stderr) }() + // Start mock test server to serve nimservice endpoint. + testServerHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(`{"object": "list", "data":[{"id": "dummy-model", "object": "model", "root": "dummy-model", "parent": null}]}`)) + Expect(err).ToNot(HaveOccurred()) + }) + testServer = httptest.NewServer(testServerHandler) + http.DefaultTransport = &mockTransport{ + targetHost: "127.0.0.1:8123", + testServer: testServer, + originalTransport: originalTransport, + } }) AfterEach(func() { + defer func() { http.DefaultTransport = originalTransport }() + defer testServer.Close() // Clean up the NIMService instance nimService := &appsv1alpha1.NIMService{ ObjectMeta: metav1.ObjectMeta{ @@ -540,7 +589,6 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { }) Describe("isDeploymentReady for setting status on NIMService", func() { - AfterEach(func() { // Clean up the Deployment instance deployment := &appsv1.Deployment{ @@ -658,6 +706,171 @@ var _ = Describe("NIMServiceReconciler for a standalone platform", func() { }) }) + Describe("update model status on NIMService", func() { + BeforeEach(func() { + ingress := &networkingv1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + Spec: nimService.GetIngressSpec(), + } + _ = client.Create(context.TODO(), ingress) + }) + AfterEach(func() { + // Clean up the Service instance + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + } + _ = client.Delete(context.TODO(), svc) + }) + + It("should fail when NIMService is unreachable", func() { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "bad.host", // not intercepted by testServer. + Ports: []corev1.ServicePort{ + { + Port: 8123, + Name: "service-port", + }, + }, + }, + } + _ = client.Create(context.TODO(), svc) + err := reconciler.updateModelStatus(context.Background(), nimService) + Expect(err).To(HaveOccurred()) + Expect(nimService.Status.Model).To(BeNil()) + }) + + It("should fail when models response is unmarshallable", func() { + testServer.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(`{"value": "invalid response"}`)) + Expect(err).ToNot(HaveOccurred()) + }) + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "127.0.0.1", + Ports: []corev1.ServicePort{ + { + Port: 8123, + Name: "service-port", + }, + }, + }, + } + _ = client.Create(context.TODO(), svc) + err := reconciler.updateModelStatus(context.Background(), nimService) + Expect(err).To(HaveOccurred()) + Expect(nimService.Status.Model).To(BeNil()) + }) + + It("should fail when model name cannot be inferred", func() { + testServer.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // Set dummy object type for model. + _, err := w.Write([]byte(`{"object": "list", "data":[{"id": "dummy-model", "object": "dummy", "root": "dummy-model", "parent": null}]}`)) + Expect(err).ToNot(HaveOccurred()) + }) + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "127.0.0.1", + Ports: []corev1.ServicePort{ + { + Port: 8123, + Name: "service-port", + }, + }, + }, + } + _ = client.Create(context.TODO(), svc) + err := reconciler.updateModelStatus(context.Background(), nimService) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed to detect model name from nimservice endpoint '127.0.0.1:8123'")) + Expect(nimService.Status.Model).To(BeNil()) + }) + + It("should set model status on NIMService", func() { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "127.0.0.1", + Ports: []corev1.ServicePort{ + { + Port: 8123, + Name: "service-port", + }, + }, + }, + } + _ = client.Create(context.TODO(), svc) + err := reconciler.updateModelStatus(context.Background(), nimService) + Expect(err).ToNot(HaveOccurred()) + modelStatus := nimService.Status.Model + Expect(modelStatus).ToNot(BeNil()) + Expect(modelStatus.ClusterEndpoint).To(Equal("127.0.0.1:8123")) + Expect(modelStatus.ExternalEndpoint).To(Equal("test-nimservice.default.example.com")) + Expect(modelStatus.Name).To(Equal("dummy-model")) + }) + + It("should succeed when nimservice has lora adapter models attached", func() { + testServer.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // Set dummy object type for model. + _, err := w.Write([]byte(`{"object": "list", "data":[{"id": "dummy-model-adapter1", "object": "model", "root": "dummy-model", "parent": null}, {"id": "dummy-model-adapter2", "object": "model", "root": "dummy-model", "parent": null}, {"id": "dummy-model", "object": "model", "root": "dummy-model", "parent": null}]}`)) + Expect(err).ToNot(HaveOccurred()) + }) + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nimservice", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "127.0.0.1", + Ports: []corev1.ServicePort{ + { + Port: 8123, + Name: "service-port", + }, + }, + }, + } + _ = client.Create(context.TODO(), svc) + err := reconciler.updateModelStatus(context.Background(), nimService) + Expect(err).ToNot(HaveOccurred()) + modelStatus := nimService.Status.Model + Expect(modelStatus).ToNot(BeNil()) + Expect(modelStatus.ClusterEndpoint).To(Equal("127.0.0.1:8123")) + Expect(modelStatus.ExternalEndpoint).To(Equal("test-nimservice.default.example.com")) + Expect(modelStatus.Name).To(Equal("dummy-model")) + }) + + }) + Describe("getNIMCacheProfile", func() { It("should return nil when NIMCache is not used", func() { nimService.Spec.Storage.NIMCache.Name = "" diff --git a/internal/controller/platform/standalone/standalone.go b/internal/controller/platform/standalone/standalone.go index b71483865..c709ed053 100644 --- a/internal/controller/platform/standalone/standalone.go +++ b/internal/controller/platform/standalone/standalone.go @@ -111,7 +111,7 @@ func (s *Standalone) Sync(ctx context.Context, r shared.Reconciler, resource cli if nimService, ok := resource.(*appsv1alpha1.NIMService); ok { reconciler := NewNIMServiceReconciler(r) reconciler.renderer = render.NewRenderer(ManifestsDir) - logger.Info("Reconciling NIMService instance") + logger.Info("Reconciling NIMService instance", "nimservice", nimService.GetName()) result, err := reconciler.reconcileNIMService(ctx, nimService) if err != nil { r.GetEventRecorder().Eventf(nimService, corev1.EventTypeWarning, "ReconcileFailed", diff --git a/internal/nimmodels/models.go b/internal/nimmodels/models.go new file mode 100644 index 000000000..795863dd3 --- /dev/null +++ b/internal/nimmodels/models.go @@ -0,0 +1,90 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nimmodels + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type ObjectType string + +const ( + ObjectTypeList ObjectType = "list" + ObjectTypeModel ObjectType = "model" +) + +const ( + modelsV1URI = "/v1/models" +) + +type ModelsV1Info struct { + Id string `json:"id"` + Object ObjectType `json:"object"` + Parent *string `json:"parent,omitempty"` + Root *string `json:"root,omitempty"` +} +type ModelsV1List struct { + Object ObjectType `json:"object"` + Data []ModelsV1Info `json:"data,omitempty"` +} + +func getModelsV1URL(nimServiceEndpoint string) string { + return fmt.Sprintf("http://%s%s", nimServiceEndpoint, modelsV1URI) +} + +func ListModelsV1(ctx context.Context, nimServiceEndpoint string) (*ModelsV1List, error) { + logger := log.FromContext(ctx) + + httpClient := http.Client{ + Timeout: 30 * time.Second, + } + modelsURL := getModelsV1URL(nimServiceEndpoint) + modelsReq, err := http.NewRequest(http.MethodGet, modelsURL, nil) + if err != nil { + logger.Error(err, "Failed to prepare request for models endpoint", "url", modelsURL) + return nil, err + } + + modelsResp, err := httpClient.Do(modelsReq) + if err != nil { + logger.Error(err, "Failed to make request for models endpoint", "url", modelsURL) + return nil, err + } + defer modelsResp.Body.Close() + + modelsData, err := io.ReadAll(modelsResp.Body) + if err != nil { + logger.Error(err, "Failed to read models api response", "url", modelsURL) + return nil, err + } + + var modelsList ModelsV1List + err = json.Unmarshal(modelsData, &modelsList) + if err != nil { + logger.Error(err, "Failed to unmarshal models response", "url", modelsURL) + return nil, err + } + + return &modelsList, nil +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 6ff30569b..95f42eb70 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -286,3 +286,10 @@ func CalculateSHA256(data string) string { hash := sha256.Sum256([]byte(data)) return hex.EncodeToString(hash[:]) } + +func FormatEndpoint(ip string, port int32) string { + if ip == "" { + return "" + } + return fmt.Sprintf("%s:%d", ip, port) +}