Skip to content

fix: resolve service name conflict for notebooks and tensorboards #7468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 134 additions & 35 deletions components/notebook-controller/controllers/notebook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package controllers

import (
"context"

"encoding/json"
"fmt"
"os"
Expand All @@ -34,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
Expand All @@ -53,6 +55,9 @@ const AnnotationRewriteURI = "notebooks.kubeflow.org/http-rewrite-uri"
const AnnotationHeadersRequestSet = "notebooks.kubeflow.org/http-headers-request-set"

const PrefixEnvVar = "NB_PREFIX"
const namePrefix = "nb"
const generatedSuffixLength = 5
const maxNameLength = 63

// The default fsGroup of PodSecurityContext.
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.11/#podsecuritycontext-v1-core
Expand Down Expand Up @@ -110,6 +115,12 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, ignoreNotFound(err)
}

// Make sure the prefix doesn't cause the derived resource names to get too long
if len(nbName)+generatedSuffixLength+len(namePrefix) > maxNameLength {
return reconcile.Result{},
fmt.Errorf("notebook name must not be longer than %d characters as notebook resources are prefixed with %s%s", maxNameLength-(len(namePrefix)+generatedSuffixLength), namePrefix, involvedNotebookKey.Namespace)
}

// re-emit the event in the Notebook CR
log.Info("Emitting Notebook Event.", "Event", event)
r.EventRecorder.Eventf(involvedNotebook, event.Type, event.Reason,
Expand Down Expand Up @@ -142,9 +153,23 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
// Check if the StatefulSet already exists
foundStateful := &appsv1.StatefulSet{}
namespacedStatefulSets := &appsv1.StatefulSetList{}
justCreated := false
err := r.Get(ctx, types.NamespacedName{Name: ss.Name, Namespace: ss.Namespace}, foundStateful)
if err != nil && apierrs.IsNotFound(err) {

err := r.List(ctx, namespacedStatefulSets, client.InNamespace(ss.Namespace))
if err != nil {
log.Error(err, "error listing StatefulSets")
return ctrl.Result{}, err
}

for _, sts := range namespacedStatefulSets.Items {
if metav1.IsControlledBy(&sts, instance) {
foundStateful = &sts
break
}
}

if foundStateful.Name == "" && foundStateful.Namespace == "" {
log.Info("Creating StatefulSet", "namespace", ss.Namespace, "name", ss.Name)
r.Metrics.NotebookCreation.WithLabelValues(ss.Namespace).Inc()
err = r.Create(ctx, ss)
Expand All @@ -154,10 +179,8 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
r.Metrics.NotebookFailCreation.WithLabelValues(ss.Namespace).Inc()
return ctrl.Result{}, err
}
} else if err != nil {
log.Error(err, "error getting Statefulset")
return ctrl.Result{}, err
}

// Update the foundStateful object and write the result back if there are any changes
if !justCreated && reconcilehelper.CopyStatefulSetFields(ss, foundStateful) {
log.Info("Updating StatefulSet", "namespace", ss.Namespace, "name", ss.Name)
Expand All @@ -173,21 +196,32 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err := ctrl.SetControllerReference(instance, service, r.Scheme); err != nil {
return ctrl.Result{}, err
}

// Check if the Service already exists
foundService := &corev1.Service{}
namespacedServices := &corev1.ServiceList{}
justCreated = false
err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService)
if err != nil && apierrs.IsNotFound(err) {
err = r.List(ctx, namespacedServices, client.InNamespace(ss.Namespace))
if err != nil {
log.Error(err, "error listing Services")
return ctrl.Result{}, err
}

for _, ser := range namespacedServices.Items {
if metav1.IsControlledBy(&ser, instance) {
foundService = &ser
break
}
}

if foundService.Name == "" && foundService.Namespace == "" {
log.Info("Creating Service", "namespace", service.Namespace, "name", service.Name)
err = r.Create(ctx, service)
justCreated = true
if err != nil {
log.Error(err, "unable to create Service")
return ctrl.Result{}, err
}
} else if err != nil {
log.Error(err, "error getting Service")
return ctrl.Result{}, err
}
// Update the foundService object and write the result back if there are any changes
if !justCreated && reconcilehelper.CopyServiceFields(service, foundService) {
Expand All @@ -199,9 +233,15 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
}

err = deleteObsoleteService(ctx, r, instance)
if err != nil {
log.Error(err, "unable to delete obsolete Service")
return ctrl.Result{}, err
}

// Reconcile virtual service if we use ISTIO.
if os.Getenv("USE_ISTIO") == "true" {
err = r.reconcileVirtualService(instance)
err = r.reconcileVirtualService(instance, foundService.Name)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -366,8 +406,8 @@ func generateStatefulSet(instance *v1beta1.Notebook) *appsv1.StatefulSet {

ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Name,
Namespace: instance.Namespace,
GenerateName: fmt.Sprintf("%s-%s-", namePrefix, instance.Name),
Namespace: instance.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Expand Down Expand Up @@ -439,17 +479,18 @@ func generateService(instance *v1beta1.Notebook) *corev1.Service {
// Define the desired Service object
port := DefaultContainerPort
containerPorts := instance.Spec.Template.Spec.Containers[0].Ports

if containerPorts != nil {
port = int(containerPorts[0].ContainerPort)
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Name,
Namespace: instance.Namespace,
Namespace: instance.Namespace,
GenerateName: fmt.Sprintf("%s-%s-", namePrefix, instance.Name),
},
Spec: corev1.ServiceSpec{
Type: "ClusterIP",
Selector: map[string]string{"statefulset": instance.Name},
Selector: map[string]string{"statefulset": instance.Name, "notebook-name": instance.Name},
Ports: []corev1.ServicePort{
{
// Make port name follow Istio pattern so it can be managed by istio rbac
Expand All @@ -464,11 +505,7 @@ func generateService(instance *v1beta1.Notebook) *corev1.Service {
return svc
}

func virtualServiceName(kfName string, namespace string) string {
return fmt.Sprintf("notebook-%s-%s", namespace, kfName)
}

func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructured, error) {
func generateVirtualService(instance *v1beta1.Notebook, serviceName string) (*unstructured.Unstructured, error) {
name := instance.Name
namespace := instance.Namespace
clusterDomain := "cluster.local"
Expand All @@ -489,12 +526,12 @@ func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructu
if clusterDomainFromEnv, ok := os.LookupEnv("CLUSTER_DOMAIN"); ok {
clusterDomain = clusterDomainFromEnv
}
service := fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain)
service := fmt.Sprintf("%s.%s.svc.%s", serviceName, namespace, clusterDomain)

vsvc := &unstructured.Unstructured{}
vsvc.SetAPIVersion("networking.istio.io/v1alpha3")
vsvc.SetKind("VirtualService")
vsvc.SetName(virtualServiceName(name, namespace))
vsvc.SetGenerateName(fmt.Sprintf("%s-%s-", namePrefix, instance.Name))
vsvc.SetNamespace(namespace)

istioHost := os.Getenv("ISTIO_HOST")
Expand Down Expand Up @@ -570,9 +607,9 @@ func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructu

}

func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook) error {
func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook, serviceName string) error {
log := r.Log.WithValues("notebook", instance.Namespace)
virtualService, err := generateVirtualService(instance)
virtualService, err := generateVirtualService(instance, serviceName)
if err != nil {
log.Info("Unable to generate VirtualService...", err)
return err
Expand All @@ -582,26 +619,52 @@ func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook)
}
// Check if the virtual service already exists.
foundVirtual := &unstructured.Unstructured{}
justCreated := false
foundVirtual.SetAPIVersion("networking.istio.io/v1alpha3")
foundVirtual.SetKind("VirtualService")
err = r.Get(context.TODO(), types.NamespacedName{Name: virtualServiceName(instance.Name,
instance.Namespace), Namespace: instance.Namespace}, foundVirtual)
if err != nil && apierrs.IsNotFound(err) {
log.Info("Creating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
namespacedVirtualServices := &unstructured.UnstructuredList{}
namespacedVirtualServices.SetGroupVersionKind(
schema.GroupVersionKind{
Group: "networking.istio.io",
Version: "v1alpha3",
Kind: "VirtualService",
},
)
justCreated := false

// List the VirtualServices in the given namespace
err = r.List(
context.TODO(),
namespacedVirtualServices,
client.InNamespace(instance.Namespace),
)
if err != nil {
log.Error(err, "error listing Virtual Services")
return err
}

// Manually filter the resources by kind and apiVersion
for _, virSer := range namespacedVirtualServices.Items {
// Check if the resource's kind and apiVersion match
if virSer.GetKind() == "VirtualService" && virSer.GetAPIVersion() == "networking.istio.io/v1alpha3" {
// If the resource is controlled by the instance, assign it to foundVirtual
if metav1.IsControlledBy(&virSer, instance) {
foundVirtual = &virSer
break
}
}
}

if foundVirtual.GetName() == "" && foundVirtual.GetNamespace() == "" {
log.Info("Creating virtual service", "namespace", instance.Namespace, "notebook-name", instance.Name)
err = r.Create(context.TODO(), virtualService)
justCreated = true
if err != nil {
return err
}
} else if err != nil {
return err
}

if !justCreated && reconcilehelper.CopyVirtualService(virtualService, foundVirtual) {
log.Info("Updating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
log.Info("Updating virtual service", "namespace", instance.Namespace, "notebook-name", instance.Name)
err = r.Update(context.TODO(), foundVirtual)
if err != nil {
return err
Expand All @@ -611,6 +674,42 @@ func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook)
return nil
}

func deleteObsoleteService(ctx context.Context, r *NotebookReconciler, instance *v1beta1.Notebook) error {
log := r.Log.WithValues("notebook", instance.Namespace)
obsoleteServiceName := instance.Name
obsoleteService := &corev1.Service{}

err := r.Get(ctx, client.ObjectKey{Name: obsoleteServiceName, Namespace: instance.Namespace}, obsoleteService)
if apierrs.IsNotFound(err) {
log.Info("Obsolete Service not found; nothing to delete", "namespace", instance.Namespace, "name", obsoleteServiceName)
return nil
} else if err != nil {
log.Error(err, "error getting obsolete service", "namespace", instance.Namespace, "name", obsoleteServiceName)
return err
}

log.Info("Found obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)

// Remove owner references
obsoleteService.OwnerReferences = []metav1.OwnerReference{}
err = r.Update(ctx, obsoleteService)
if err != nil {
log.Error(err, "unable to update owner reference for obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return err
}

// Delete the obsolete service
err = r.Delete(ctx, obsoleteService)
if err != nil {
log.Error(err, "unable to delete obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return err
}

log.Info("Deleted obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return nil

}

func isStsOrPodEvent(event *corev1.Event) bool {
return event.InvolvedObject.Kind == "Pod" || event.InvolvedObject.Kind == "StatefulSet"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

nbv1beta1 "github.com/kubeflow/kubeflow/components/notebook-controller/api/v1beta1"
)
Expand Down Expand Up @@ -74,15 +75,19 @@ var _ = Describe("Notebook controller", func() {
*/
By("By checking that the Notebook has statefulset")
Eventually(func() (bool, error) {
sts := &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{
Name: Name,
Namespace: Namespace,
}}
err := k8sClient.Get(ctx, notebookLookupKey, sts)
namespacedStatefulSets := &appsv1.StatefulSetList{}

err := k8sClient.List(ctx, namespacedStatefulSets, client.InNamespace(Namespace))
if err != nil {
return false, err
}
return true, nil

for _, sts := range namespacedStatefulSets.Items {
if metav1.IsControlledBy(&sts, createdNotebook) {
return true, nil
}
}
return false, nil
}, timeout, interval).Should(BeTrue())
})
})
Expand Down
Loading
Loading