@@ -19,8 +19,7 @@ package controller
19
19
import (
20
20
"context"
21
21
"fmt"
22
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
23
- "os"
22
+ "github.com/kubeflow/notebooks/workspaces/controller/internal/istio"
24
23
"reflect"
25
24
"strings"
26
25
@@ -55,8 +54,6 @@ const (
55
54
workspaceSelectorLabel = "statefulset"
56
55
57
56
// lengths for resource names
58
- generateNameSuffixLength = 6
59
- maxServiceNameLength = 63
60
57
maxStatefulSetNameLength = 52 // https://github.com/kubernetes/kubernetes/issues/64023
61
58
62
59
// state message formats for Workspace status
@@ -78,10 +75,6 @@ const (
78
75
stateMsgRunning = "Workspace is running"
79
76
stateMsgTerminating = "Workspace is terminating"
80
77
stateMsgUnknown = "Workspace is in an unknown state"
81
-
82
- IstioHost = "ISTIO_HOST"
83
- IstioGateway = "ISTIO_GATEWAY"
84
- ClusterDomain = "CLUSTER_DOMAIN"
85
78
)
86
79
87
80
// WorkspaceReconciler reconciles a Workspace object
@@ -352,7 +345,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
352
345
// TODO: reconcile the Istio VirtualService to expose the Workspace
353
346
// and implement the `spec.podTemplate.httpProxy` options
354
347
//
355
- virtualService , err := GenerateIstioVirtualService (workspace , workspaceKind , currentImageConfig , serviceName , log )
348
+ virtualService , err := istio . GenerateIstioVirtualService (workspace , workspaceKind , currentImageConfig , serviceName , log )
356
349
if err != nil {
357
350
log .Error (err , "unable to generate Istio Virtual Service" )
358
351
}
@@ -362,7 +355,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
362
355
return ctrl.Result {}, err
363
356
}
364
357
365
- if err := ReconcileVirtualService (ctx , r .Client , virtualService .GetName (), virtualService .GetNamespace (), virtualService , log ); err != nil {
358
+ if err := istio . ReconcileVirtualService (ctx , r .Client , virtualService .GetName (), virtualService .GetNamespace (), virtualService , log ); err != nil {
366
359
return ctrl.Result {}, err
367
360
}
368
361
@@ -574,25 +567,10 @@ func getPodConfig(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefl
574
567
}
575
568
}
576
569
577
- // generateNamePrefix generates a name prefix for a Workspace
578
- // the format is "ws-{WORKSPACE_NAME}-" the workspace name is truncated to fit within the max length
579
- func generateNamePrefix (workspaceName string , maxLength int ) string {
580
- namePrefix := fmt .Sprintf ("ws-%s" , workspaceName )
581
- maxLength = maxLength - generateNameSuffixLength // subtract 6 for the `metadata.generateName` suffix
582
- maxLength = maxLength - 1 // subtract 1 for the trailing "-"
583
- if len (namePrefix ) > maxLength {
584
- namePrefix = namePrefix [:min (len (namePrefix ), maxLength )]
585
- }
586
- if namePrefix [len (namePrefix )- 1 ] != '-' {
587
- namePrefix = namePrefix + "-"
588
- }
589
- return namePrefix
590
- }
591
-
592
570
// generateStatefulSet generates a StatefulSet for a Workspace
593
571
func generateStatefulSet (workspace * kubefloworgv1beta1.Workspace , workspaceKind * kubefloworgv1beta1.WorkspaceKind , imageConfigSpec kubefloworgv1beta1.ImageConfigSpec , podConfigSpec kubefloworgv1beta1.PodConfigSpec ) (* appsv1.StatefulSet , error ) {
594
572
// generate name prefix
595
- namePrefix := generateNamePrefix (workspace .Name , maxStatefulSetNameLength )
573
+ namePrefix := helper . GenerateNamePrefix (workspace .Name , maxStatefulSetNameLength )
596
574
597
575
// generate replica count
598
576
replicas := int32 (1 )
@@ -613,7 +591,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
613
591
// define go string template functions
614
592
// NOTE: these are used in places like the `extraEnv` values
615
593
containerPortsIdMap := make (map [string ]kubefloworgv1beta1.ImagePort )
616
- httpPathPrefixFunc := generateHttpPathPrefixFunc (workspace , containerPortsIdMap )
594
+ httpPathPrefixFunc := helper . GenerateHttpPathPrefixFunc (workspace , containerPortsIdMap )
617
595
618
596
// generate container ports
619
597
containerPorts := make ([]corev1.ContainerPort , len (imageConfigSpec .Ports ))
@@ -639,7 +617,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
639
617
env := env .DeepCopy () // copy to avoid modifying the original
640
618
if env .Value != "" {
641
619
rawValue := env .Value
642
- outValue , err := helper .RenderExtraEnvValueTemplate (rawValue , httpPathPrefixFunc )
620
+ outValue , err := helper .RenderValueUsingFunc (rawValue , httpPathPrefixFunc )
643
621
if err != nil {
644
622
return nil , fmt .Errorf ("failed to render extraEnv %q: %w" , env .Name , err )
645
623
}
@@ -815,21 +793,10 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
815
793
return statefulSet , nil
816
794
}
817
795
818
- func generateHttpPathPrefixFunc (workspace * kubefloworgv1beta1.Workspace , containerPortsIdMap map [string ]kubefloworgv1beta1.ImagePort ) func (portId string ) string {
819
- return func (portId string ) string {
820
- port , ok := containerPortsIdMap [portId ]
821
- if ok {
822
- return fmt .Sprintf ("/workspace/%s/%s/%s/" , workspace .Namespace , workspace .Name , port .Id )
823
- } else {
824
- return ""
825
- }
826
- }
827
- }
828
-
829
796
// generateService generates a Service for a Workspace
830
797
func generateService (workspace * kubefloworgv1beta1.Workspace , imageConfigSpec kubefloworgv1beta1.ImageConfigSpec ) (* corev1.Service , error ) {
831
798
// generate name prefix
832
- namePrefix := generateNamePrefix (workspace .Name , maxServiceNameLength )
799
+ namePrefix := helper . GenerateNamePrefix (workspace .Name , helper . MaxServiceNameLength )
833
800
834
801
// generate service ports
835
802
servicePorts := make ([]corev1.ServicePort , len (imageConfigSpec .Ports ))
@@ -1026,199 +993,3 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
1026
993
status .StateMessage = stateMsgUnknown
1027
994
return status , nil
1028
995
}
1029
-
1030
- const istioApiVersion = "networking.istio.io/v1"
1031
- const virtualServiceKind = "VirtualService"
1032
-
1033
- func GenerateIstioVirtualService (workspace * kubefloworgv1beta1.Workspace , workspaceKind * kubefloworgv1beta1.WorkspaceKind , imageConfig * kubefloworgv1beta1.ImageConfigValue , serviceName string , _ logr.Logger ) (* unstructured.Unstructured , error ) {
1034
-
1035
- virtualService := & unstructured.Unstructured {}
1036
- virtualService .SetAPIVersion (istioApiVersion )
1037
- virtualService .SetKind (virtualServiceKind )
1038
-
1039
- prefix := generateNamePrefix (workspace .Name , maxServiceNameLength )
1040
- virtualService .SetName (removeTrailingDash (prefix ))
1041
- virtualService .SetNamespace (workspace .Namespace )
1042
-
1043
- // .spec.gateways
1044
- istioGateway := getEnvOrDefault (IstioGateway , "kubeflow/kubeflow-gateway" )
1045
- if err := unstructured .SetNestedStringSlice (virtualService .Object , []string {istioGateway },
1046
- "spec" , "gateways" ); err != nil {
1047
- return nil , fmt .Errorf ("set .spec.gateways error: %v" , err )
1048
- }
1049
-
1050
- istioHost := getEnvOrDefault (IstioHost , "*" )
1051
- if err := unstructured .SetNestedStringSlice (virtualService .Object , []string {istioHost },
1052
- "spec" , "gateways" ); err != nil {
1053
- return nil , fmt .Errorf ("set .spec.hosts error: %v" , err )
1054
- }
1055
-
1056
- var prefixes []string
1057
- for _ , imagePort := range imageConfig .Spec .Ports {
1058
- prefix := fmt .Sprintf ("/workspace/%s/%s/%s" , workspace .Namespace , workspace .Name , imagePort .Id )
1059
- prefixes = append (prefixes , prefix )
1060
- }
1061
-
1062
- var httpRoutes []interface {}
1063
-
1064
- host := fmt .Sprintf ("%s.%s.svc.%s" , serviceName , workspace .Namespace , getEnvOrDefault (ClusterDomain , "cluster.local" ))
1065
-
1066
- // generate container ports
1067
- // TODO: It can be better
1068
- containerPortsIdMap , err := generateContainerPortsIdMap (imageConfig )
1069
- if errContainerPorts := unstructured .SetNestedStringSlice (virtualService .Object , []string {istioHost },
1070
- "spec" , "gateways" ); err != nil {
1071
- return nil , fmt .Errorf ("set .spec.hosts error: %v" , errContainerPorts )
1072
- }
1073
- httpPathPrefixFunc := generateHttpPathPrefixFunc (workspace , containerPortsIdMap )
1074
-
1075
- for _ , imagePort := range imageConfig .Spec .Ports {
1076
-
1077
- httpRoute := map [string ]interface {}{
1078
- "match" : []map [string ]interface {}{
1079
- {
1080
- "uri" : map [string ]interface {}{
1081
- "prefix" : fmt .Sprintf ("/workspace/%s/%s/%s" , workspace .Namespace , workspace .Name , imagePort .Id ),
1082
- },
1083
- },
1084
- },
1085
- "route" : []map [string ]interface {}{
1086
- {
1087
- "destination" : map [string ]interface {}{
1088
- "host" : host ,
1089
- "port" : map [string ]interface {}{
1090
- "number" : imagePort .Port ,
1091
- },
1092
- },
1093
- },
1094
- },
1095
- }
1096
-
1097
- if * workspaceKind .Spec .PodTemplate .HTTPProxy .RemovePathPrefix {
1098
- httpRoute ["rewrite" ] = map [string ]interface {}{"uri" : "/" }
1099
- }
1100
-
1101
- // templating.spec.http[].math.headers
1102
- setHeaders := templateHeaders (workspaceKind .Spec .PodTemplate .HTTPProxy .RequestHeaders .Set , httpPathPrefixFunc )
1103
- addHeaders := templateHeaders (workspaceKind .Spec .PodTemplate .HTTPProxy .RequestHeaders .Add , httpPathPrefixFunc )
1104
-
1105
- removeHeaders := make ([]string , len (workspaceKind .Spec .PodTemplate .HTTPProxy .RequestHeaders .Remove ))
1106
- for i , header := range workspaceKind .Spec .PodTemplate .HTTPProxy .RequestHeaders .Remove {
1107
- if header != "" {
1108
- out , err := helper .RenderExtraEnvValueTemplate (header , httpPathPrefixFunc )
1109
- if err != nil {
1110
- return nil , fmt .Errorf ("failed to render header %q: %w" , header , err )
1111
- }
1112
- header = out
1113
- }
1114
- removeHeaders [i ] = header
1115
- }
1116
-
1117
- httpRoute ["headers" ] = map [string ]interface {}{
1118
- "request" : map [string ]interface {}{
1119
- "add" : setHeaders ,
1120
- "set" : addHeaders ,
1121
- "remove" : removeHeaders ,
1122
- },
1123
- }
1124
-
1125
- httpRoutes = append (httpRoutes , httpRoute )
1126
- }
1127
-
1128
- virtualService .Object ["spec" ] = map [string ]interface {}{
1129
- "gateways" : []string {
1130
- istioGateway ,
1131
- },
1132
- "hosts" : []string {
1133
- istioHost ,
1134
- },
1135
- "http" : httpRoutes ,
1136
- }
1137
-
1138
- return virtualService , nil
1139
- }
1140
-
1141
- func templateHeaders (requestHeaders map [string ]string , httpPathPrefixFunc func (portId string ) string ) map [string ]string {
1142
-
1143
- if len (requestHeaders ) == 0 {
1144
- return make (map [string ]string , 0 )
1145
- }
1146
-
1147
- headers := make (map [string ]string , len (requestHeaders ))
1148
- for _ , header := range requestHeaders {
1149
- value := headers [header ]
1150
- if value != "" {
1151
- out , err := helper .RenderExtraEnvValueTemplate (header , httpPathPrefixFunc )
1152
- if err != nil {
1153
- return make (map [string ]string , 0 )
1154
- }
1155
- value = out
1156
- }
1157
- headers [header ] = value
1158
- }
1159
- return headers
1160
- }
1161
-
1162
- func generateContainerPortsIdMap (imageConfig * kubefloworgv1beta1.ImageConfigValue ) (map [string ]kubefloworgv1beta1.ImagePort , error ) {
1163
- containerPortsIdMap := make (map [string ]kubefloworgv1beta1.ImagePort )
1164
-
1165
- containerPorts := make ([]corev1.ContainerPort , len (imageConfig .Spec .Ports ))
1166
- seenPorts := make (map [int32 ]bool )
1167
- for i , port := range imageConfig .Spec .Ports {
1168
- if seenPorts [port .Port ] {
1169
- return nil , fmt .Errorf ("duplicate port number %d in imageConfig" , port .Port )
1170
- }
1171
- containerPorts [i ] = corev1.ContainerPort {
1172
- Name : fmt .Sprintf ("http-%d" , port .Port ),
1173
- ContainerPort : port .Port ,
1174
- Protocol : corev1 .ProtocolTCP ,
1175
- }
1176
- seenPorts [port .Port ] = true
1177
- containerPortsIdMap [port .Id ] = port
1178
- }
1179
- return containerPortsIdMap , nil
1180
- }
1181
-
1182
- func getEnvOrDefault (name , defaultValue string ) string {
1183
- if lookupEnv , exists := os .LookupEnv (name ); exists {
1184
- return lookupEnv
1185
- } else {
1186
- return defaultValue
1187
- }
1188
- }
1189
-
1190
- func ReconcileVirtualService (ctx context.Context , r client.Client , virtualServiceName , namespace string , virtualService * unstructured.Unstructured , log logr.Logger ) error {
1191
- foundVirtualService := & unstructured.Unstructured {}
1192
- foundVirtualService .SetAPIVersion (istioApiVersion )
1193
- foundVirtualService .SetKind (virtualServiceKind )
1194
- justCreated := false
1195
- if err := r .Get (ctx , types.NamespacedName {Name : virtualServiceName , Namespace : namespace }, foundVirtualService ); err != nil {
1196
- if apierrors .IsNotFound (err ) {
1197
- log .Info ("Creating virtual service" , "namespace" , namespace , "name" , virtualServiceName )
1198
- if err := r .Create (ctx , virtualService ); err != nil {
1199
- log .Error (err , "unable to create virtual service" )
1200
- return err
1201
- }
1202
- justCreated = true
1203
- } else {
1204
- log .Error (err , "error getting virtual service" )
1205
- return err
1206
- }
1207
- }
1208
- if ! justCreated {
1209
- log .Info ("Updating virtual service" , "namespace" , namespace , "name" , virtualServiceName )
1210
- if err := r .Update (ctx , foundVirtualService ); err != nil {
1211
- log .Error (err , "unable to update virtual service" )
1212
- return err
1213
- }
1214
- }
1215
-
1216
- return nil
1217
- }
1218
-
1219
- func removeTrailingDash (s string ) string {
1220
- if len (s ) > 0 && s [len (s )- 1 ] == '-' {
1221
- return s [:len (s )- 1 ]
1222
- }
1223
- return s
1224
- }
0 commit comments