diff --git a/operator/api/common/labels.go b/operator/api/common/labels.go index b453fea33..fb21e6f8e 100644 --- a/operator/api/common/labels.go +++ b/operator/api/common/labels.go @@ -43,6 +43,9 @@ const ( LabelPodCliqueScalingGroupReplicaIndex = "grove.io/podcliquescalinggroup-replica-index" // LabelPodTemplateHash is a key for a label that sets the hash of the PodSpec. This label will be set on a PodClique and will be shared by all pods in the PodClique. LabelPodTemplateHash = "grove.io/pod-template-hash" + // LabelSchedulerBackend is a key for a label that specifies which scheduler backend should handle the PodGang. + // Values: "workload" (for default-scheduler), "kai" (for KAI scheduler), etc. + LabelSchedulerBackend = "grove.io/scheduler-backend" ) // Labels for setting component names for all managed resources whose lifecycle diff --git a/operator/api/core/v1alpha1/zz_generated.deepcopy.go b/operator/api/core/v1alpha1/zz_generated.deepcopy.go index 369f5659a..216962b92 100644 --- a/operator/api/core/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/core/v1alpha1/zz_generated.deepcopy.go @@ -55,6 +55,10 @@ func (in *AutoScalingConfig) DeepCopy() *AutoScalingConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BasicSchedulingPolicy. + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterTopology) DeepCopyInto(out *ClusterTopology) { *out = *in @@ -136,6 +140,10 @@ func (in *ClusterTopologySpec) DeepCopy() *ClusterTopologySpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GangSchedulingPolicy. + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HeadlessServiceConfig) DeepCopyInto(out *HeadlessServiceConfig) { *out = *in @@ -884,6 +892,14 @@ func (in *PodGangStatus) DeepCopy() *PodGangStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodGroup. + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodGroupPolicy. + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodsSelectedToUpdate) DeepCopyInto(out *PodsSelectedToUpdate) { *out = *in @@ -941,3 +957,19 @@ func (in *TopologyLevel) DeepCopy() *TopologyLevel { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Workload. + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadList. + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadSpec. diff --git a/operator/charts/templates/clusterrole.yaml b/operator/charts/templates/clusterrole.yaml index 4cc2c83d7..e98663a7b 100644 --- a/operator/charts/templates/clusterrole.yaml +++ b/operator/charts/templates/clusterrole.yaml @@ -18,6 +18,19 @@ rules: - deletecollection - patch - update +- apiGroups: + - scheduling.k8s.io + resources: + - workloads + verbs: + - create + - get + - list + - watch + - delete + - deletecollection + - patch + - update - apiGroups: - grove.io resources: diff --git a/operator/client/go.mod b/operator/client/go.mod index dde81078e..6f4d1728e 100644 --- a/operator/client/go.mod +++ b/operator/client/go.mod @@ -28,11 +28,11 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/term v0.37.0 // indirect + golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.9.0 // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect diff --git a/operator/client/go.sum b/operator/client/go.sum index ef675a034..306e82975 100644 --- a/operator/client/go.sum +++ b/operator/client/go.sum @@ -98,6 +98,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -108,12 +109,15 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/operator/go.mod b/operator/go.mod index 79779c20e..dfd393b22 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -1,6 +1,6 @@ module github.com/ai-dynamo/grove/operator -go 1.24.0 +go 1.25.0 require ( github.com/ai-dynamo/grove/operator/api v0.0.0 @@ -16,11 +16,11 @@ require ( github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 helm.sh/helm/v3 v3.19.2 - k8s.io/api v0.34.2 + k8s.io/api v0.35.0-beta.0 k8s.io/apiextensions-apiserver v0.34.2 - k8s.io/apimachinery v0.34.2 + k8s.io/apimachinery v0.35.0-beta.0 k8s.io/cli-runtime v0.34.2 - k8s.io/client-go v0.34.2 + k8s.io/client-go v0.35.0-beta.0 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 sigs.k8s.io/controller-runtime v0.22.4 @@ -129,7 +129,6 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect - github.com/onsi/ginkgo/v2 v2.23.4 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect @@ -173,7 +172,7 @@ require ( go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.yaml.in/yaml/v2 v2.4.2 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect go4.org/netipx v0.0.0-20231129151722-fdeea329fbba // indirect golang.org/x/crypto v0.45.0 // indirect @@ -189,7 +188,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/grpc v1.72.1 // indirect - google.golang.org/protobuf v1.36.7 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect @@ -197,7 +196,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiserver v0.34.2 // indirect k8s.io/component-base v0.34.2 // indirect - k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 // indirect + k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect k8s.io/kubectl v0.34.0 // indirect oras.land/oras-go/v2 v2.6.0 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect diff --git a/operator/go.sum b/operator/go.sum index e4bc0d7f0..a7fcff109 100644 --- a/operator/go.sum +++ b/operator/go.sum @@ -356,13 +356,14 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= -github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus= -github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= +github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns= +github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= -github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY= -github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o= +github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= +github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= github.com/open-policy-agent/cert-controller v0.14.0 h1:TPc19BOHOs4tARruTT5o4bzir7Ed6FF+j3EXP/nmZBs= github.com/open-policy-agent/cert-controller v0.14.0/go.mod h1:UhE/FU54DnKo+Rt0Yf3r+oKjgy6kqSH8Vsjo+5bGrSo= github.com/open-policy-agent/frameworks/constraint v0.0.0-20241101234656-e78c8abd754a h1:gQtOJ50XFyL2Xh3lDD9zP4KQ2PY4mZKQ9hDcWc81Sp8= @@ -422,8 +423,8 @@ github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb github.com/redis/go-redis/extra/redisotel/v9 v9.0.5/go.mod h1:WZjPDy7VNzn77AAfnAfVjZNvfJTYfPetfZk5yoSTLaQ= github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rubenv/sql-migrate v1.8.0 h1:dXnYiJk9k3wetp7GfQbKJcPHjVJL6YK19tKj8t2Ns0o= github.com/rubenv/sql-migrate v1.8.0/go.mod h1:F2bGFBwCU+pnmbtNYDeKvSuvL6lBVtXDXUUv5t+u1qw= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -550,16 +551,14 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= -go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= -go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M= @@ -642,8 +641,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go. google.golang.org/grpc v1.0.5/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA= google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= -google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= -google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/cenkalti/backoff.v2 v2.2.1 h1:eJ9UAg01/HIHG987TwxvnzK2MgxXq97YY6rYDpY9aII= @@ -678,26 +677,26 @@ gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= helm.sh/helm/v3 v3.19.2 h1:psQjaM8aIWrSVEly6PgYtLu/y6MRSmok4ERiGhZmtUY= helm.sh/helm/v3 v3.19.2/go.mod h1:gX10tB5ErM+8fr7bglUUS/UfTOO8UUTYWIBH1IYNnpE= -k8s.io/api v0.34.2 h1:fsSUNZhV+bnL6Aqrp6O7lMTy6o5x2C4XLjnh//8SLYY= -k8s.io/api v0.34.2/go.mod h1:MMBPaWlED2a8w4RSeanD76f7opUoypY8TFYkSM+3XHw= +k8s.io/api v0.35.0-beta.0 h1:eqAAVeSatXNnsPjaeFrFGqSl5ihtPY4e8Txy2nYPOnw= +k8s.io/api v0.35.0-beta.0/go.mod h1:UXuvkssy8lHPSP381eqqBOW4BvRTicVpRjv7k2sjo4Y= k8s.io/apiextensions-apiserver v0.34.2 h1:WStKftnGeoKP4AZRz/BaAAEJvYp4mlZGN0UCv+uvsqo= k8s.io/apiextensions-apiserver v0.34.2/go.mod h1:398CJrsgXF1wytdaanynDpJ67zG4Xq7yj91GrmYN2SE= -k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= -k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apimachinery v0.35.0-beta.0 h1:vVoDiASLwUEv5yZceZCBRPXBc1f9wUOZs7ZbEbGr5sY= +k8s.io/apimachinery v0.35.0-beta.0/go.mod h1:dR9KPaf5L0t2p9jZg/wCGB4b3ma2sXZ2zdNqILs+Sak= k8s.io/apiserver v0.34.2 h1:2/yu8suwkmES7IzwlehAovo8dDE07cFRC7KMDb1+MAE= k8s.io/apiserver v0.34.2/go.mod h1:gqJQy2yDOB50R3JUReHSFr+cwJnL8G1dzTA0YLEqAPI= k8s.io/cli-runtime v0.34.2 h1:cct1GEuWc3IyVT8MSCoIWzRGw9HJ/C5rgP32H60H6aE= k8s.io/cli-runtime v0.34.2/go.mod h1:X13tsrYexYUCIq8MarCBy8lrm0k0weFPTpcaNo7lms4= -k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M= -k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE= +k8s.io/client-go v0.35.0-beta.0 h1:4APvMU7+XwWF+XoqAv+gbtSmwjPCXXXo4XVcY89Rde0= +k8s.io/client-go v0.35.0-beta.0/go.mod h1:+XxnPEoaCIB5G0zpwXRh3AnT+CvgS5lA+AFr9EtHUcA= k8s.io/component-base v0.34.2 h1:HQRqK9x2sSAsd8+R4xxRirlTjowsg6fWCPwWYeSvogQ= k8s.io/component-base v0.34.2/go.mod h1:9xw2FHJavUHBFpiGkZoKuYZ5pdtLKe97DEByaA+hHbM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-aggregator v0.33.4 h1:TdIJKHb0/bLpby7FblXIaVEzyA1jGEjzt/n9cRvwq8U= k8s.io/kube-aggregator v0.33.4/go.mod h1:wZuctdRvGde5bwzxkZRs0GYj2KOpCNgx8rRGVoNb62k= -k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 h1:liMHz39T5dJO1aOKHLvwaCjDbf07wVh6yaUlTpunnkE= -k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= k8s.io/kubectl v0.34.0 h1:NcXz4TPTaUwhiX4LU+6r6udrlm0NsVnSkP3R9t0dmxs= k8s.io/kubectl v0.34.0/go.mod h1:bmd0W5i+HuG7/p5sqicr0Li0rR2iIhXL0oUyLF3OjR4= k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= diff --git a/operator/internal/controller/common/component/types.go b/operator/internal/controller/common/component/types.go index 255e53687..c3d663dd1 100644 --- a/operator/internal/controller/common/component/types.go +++ b/operator/internal/controller/common/component/types.go @@ -75,8 +75,10 @@ const ( KindPod Kind = "Pod" // KindPodCliqueScalingGroup indicates that the resource is a PodCliqueScalingGroup. KindPodCliqueScalingGroup Kind = "PodCliqueScalingGroup" - // KindPodGang indicates that the resource is a PodGang. + // KindPodGang indicates that the resource is a PodGang (Grove custom scheduler API). KindPodGang Kind = "PodGang" + // KindWorkload indicates that the resource is a Workload (Kubernetes 1.35+ gang scheduling API). + KindWorkload Kind = "Workload" // KindPodCliqueSetReplica indicates that the resource is a PodCliqueSet replica. KindPodCliqueSetReplica Kind = "PodCliqueSetReplica" ) diff --git a/operator/internal/controller/manager.go b/operator/internal/controller/manager.go index 5c5155614..d148de205 100644 --- a/operator/internal/controller/manager.go +++ b/operator/internal/controller/manager.go @@ -43,12 +43,6 @@ const ( pprofBindAddress = "127.0.0.1:2753" ) -var ( - waitTillWebhookCertsReady = cert.WaitTillWebhookCertsReady - registerControllersWithMgr = RegisterControllers - registerWebhooksWithMgr = webhook.RegisterWebhooks -) - // CreateManager creates the manager. func CreateManager(operatorCfg *configv1alpha1.OperatorConfiguration) (ctrl.Manager, error) { return ctrl.NewManager(getRestConfig(operatorCfg), createManagerOptions(operatorCfg)) @@ -58,11 +52,11 @@ func CreateManager(operatorCfg *configv1alpha1.OperatorConfiguration) (ctrl.Mana func RegisterControllersAndWebhooks(mgr ctrl.Manager, logger logr.Logger, operatorCfg *configv1alpha1.OperatorConfiguration, certsReady chan struct{}) error { // Controllers will not work unless the webhoooks are fully configured and operational. // For webhooks to work cert-controller should finish its work of generating and injecting certificates. - waitTillWebhookCertsReady(logger, certsReady) - if err := registerControllersWithMgr(mgr, operatorCfg.Controllers); err != nil { + cert.WaitTillWebhookCertsReady(logger, certsReady) + if err := RegisterControllers(mgr, logger, operatorCfg.Controllers); err != nil { return err } - if err := registerWebhooksWithMgr(mgr, operatorCfg.Authorizer); err != nil { + if err := webhook.RegisterWebhooks(mgr, operatorCfg.Authorizer); err != nil { return err } return nil diff --git a/operator/internal/controller/podclique/components/pod/pod.go b/operator/internal/controller/podclique/components/pod/pod.go index ae5cb5194..5dec2e590 100644 --- a/operator/internal/controller/podclique/components/pod/pod.go +++ b/operator/internal/controller/podclique/components/pod/pod.go @@ -26,6 +26,7 @@ import ( grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "github.com/ai-dynamo/grove/operator/internal/controller/common/component" componentutils "github.com/ai-dynamo/grove/operator/internal/controller/common/component/utils" + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" groveerr "github.com/ai-dynamo/grove/operator/internal/errors" "github.com/ai-dynamo/grove/operator/internal/expect" "github.com/ai-dynamo/grove/operator/internal/utils" @@ -159,11 +160,22 @@ func (r _resource) buildResource(pcs *grovecorev1alpha1.PodCliqueSet, pclq *grov ) } pod.Spec = *pclq.Spec.PodSpec.DeepCopy() - pod.Spec.SchedulingGates = []corev1.PodSchedulingGate{{Name: podGangSchedulingGate}} + + // Use backend to mutate Pod spec based on scheduler requirements + // This adds scheduling gates, workloadRef (for default scheduler), annotations, etc. + if err = backend.MutatePod(pod, podGangName, pclq.Name); err != nil { + return groveerr.WrapError(err, + errCodeBuildPodResource, + component.OperationSync, + fmt.Sprintf("failed to mutate pod spec for scheduler %s", pod.Spec.SchedulerName), + ) + } + // Add GROVE specific Pod environment variables addEnvironmentVariables(pod, pclq, pcsName, pcsReplicaIndex, podIndex) // Configure hostname and subdomain for service discovery configurePodHostname(pcsName, pcsReplicaIndex, pclq.Name, pod, podIndex) + // If there is a need to enforce a Startup-Order then configure the init container and add it to the Pod Spec. if len(pclq.Spec.StartsAfter) != 0 { return configurePodInitContainer(pcs, pclq, pod) @@ -171,6 +183,13 @@ func (r _resource) buildResource(pcs *grovecorev1alpha1.PodCliqueSet, pclq *grov return nil } +// shouldUseWorkloadAPI determines if Kubernetes Workload API should be used based on schedulerName +// DEPRECATED: This function is now handled by backend.MutatePod() +// Keeping for backward compatibility during migration +func shouldUseWorkloadAPI(podSpec *corev1.PodSpec) bool { + return podSpec.SchedulerName == "" || podSpec.SchedulerName == "default-scheduler" +} + // Delete removes all Pods associated with the specified PodClique func (r _resource) Delete(ctx context.Context, logger logr.Logger, pclqObjectMeta metav1.ObjectMeta) error { logger.Info("Triggering delete of all pods for the PodClique") diff --git a/operator/internal/controller/podclique/components/pod/syncflow.go b/operator/internal/controller/podclique/components/pod/syncflow.go index 575bcc642..e3c711208 100644 --- a/operator/internal/controller/podclique/components/pod/syncflow.go +++ b/operator/internal/controller/podclique/components/pod/syncflow.go @@ -27,6 +27,7 @@ import ( grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "github.com/ai-dynamo/grove/operator/internal/controller/common/component" componentutils "github.com/ai-dynamo/grove/operator/internal/controller/common/component/utils" + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" groveerr "github.com/ai-dynamo/grove/operator/internal/errors" "github.com/ai-dynamo/grove/operator/internal/expect" "github.com/ai-dynamo/grove/operator/internal/index" @@ -37,6 +38,7 @@ import ( "github.com/go-logr/logr" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -145,6 +147,15 @@ func (r _resource) runSyncFlow(logger logr.Logger, sc *syncContext) syncFlowResu result.recordError(err) } logger.Info("created unassigned and scheduled gated pods", "numberOfCreatedPods", numScheduleGatedPods) + + // Refresh pod list to include newly created pods so they can have their scheduling gates removed in this reconciliation + refreshedPods, err := componentutils.GetPCLQPods(sc.ctx, r.client, sc.pcs.Name, sc.pclq) + if err != nil { + logger.Error(err, "failed to refresh pod list after creating new pods") + } else { + sc.existingPCLQPods = refreshedPods + logger.V(1).Info("refreshed pod list after creating new pods", "numPods", len(refreshedPods)) + } } else if diff > 0 { if err := r.deleteExcessPods(sc, logger, diff); err != nil { result.recordError(err) @@ -239,49 +250,71 @@ func selectExcessPodsToDelete(sc *syncContext, logger logr.Logger) []*corev1.Pod } // checkAndRemovePodSchedulingGates removes scheduling gates from pods when their dependencies are satisfied +// This method delegates the gate removal logic to the appropriate scheduler backend func (r _resource) checkAndRemovePodSchedulingGates(sc *syncContext, logger logr.Logger) ([]string, error) { tasks := make([]utils.Task, 0, len(sc.existingPCLQPods)) skippedScheduleGatedPods := make([]string, 0, len(sc.existingPCLQPods)) - // Pre-compute if the base PodGang is scheduled once for all pods in this PodClique - // All pods in the same PodClique have the same base PodGang - basePodGangScheduled, basePodGangName, err := r.checkBasePodGangScheduledForPodClique(sc.ctx, logger, sc.pclq) + // Get backend from global manager + backendManager, err := backend.GetGlobalManager() if err != nil { - logger.Error(err, "Error checking if base PodGang is scheduled for PodClique - will requeue") - return nil, groveerr.WrapError(err, - errCodeRemovePodSchedulingGate, - component.OperationSync, - "failed to check if base PodGang is scheduled for PodClique", - ) + logger.Error(err, "Backend manager not initialized, skipping gate removal") + return nil, nil } + schedulerName := sc.pclq.Spec.PodSpec.SchedulerName + schedBackend, err := backendManager.GetBackend(schedulerName) + if err != nil { + // No backend available for this scheduler - skip gate removal + logger.V(1).Info("No backend available for scheduler, skipping gate removal", + "schedulerName", schedulerName, + "error", err.Error()) + return nil, nil + } + + gateName := schedBackend.GetSchedulingGateName() + logger.V(1).Info("Using backend for scheduling gate management", + "schedulerName", schedulerName, + "backend", schedBackend.Name(), + "gateName", gateName) + for i, p := range sc.existingPCLQPods { - if hasPodGangSchedulingGate(p) { - podObjectKey := client.ObjectKeyFromObject(p) - if !slices.Contains(sc.podNamesUpdatedInPCLQPodGangs, p.Name) { - logger.Info("Pod has scheduling gate but it has not yet been updated in PodGang", "podObjectKey", podObjectKey) - skippedScheduleGatedPods = append(skippedScheduleGatedPods, p.Name) - continue - } - shouldSkip := r.shouldSkipPodSchedulingGateRemoval(logger, p, basePodGangScheduled, basePodGangName) - if shouldSkip { - skippedScheduleGatedPods = append(skippedScheduleGatedPods, p.Name) - continue - } - task := utils.Task{ - Name: fmt.Sprintf("RemoveSchedulingGate-%s-%d", p.Name, i), - Fn: func(ctx context.Context) error { - podClone := p.DeepCopy() - p.Spec.SchedulingGates = nil - if err := client.IgnoreNotFound(r.client.Patch(ctx, p, client.MergeFrom(podClone))); err != nil { - return err - } - logger.Info("Removed scheduling gate from pod", "podObjectKey", podObjectKey) - return nil - }, - } - tasks = append(tasks, task) + // Check if pod has the backend's specific scheduling gate + if !hasSpecificSchedulingGate(p, gateName) { + continue + } + + podObjectKey := client.ObjectKeyFromObject(p) + + // Ask the backend if the gate should be removed + shouldRemove, reason, err := schedBackend.ShouldRemoveSchedulingGate(sc.ctx, logger, p, sc.pclq) + if err != nil { + logger.Error(err, "Error checking if scheduling gate should be removed for pod", + "podObjectKey", podObjectKey) + return nil, groveerr.WrapError(err, + errCodeRemovePodSchedulingGate, + component.OperationSync, + fmt.Sprintf("failed to check if scheduling gate should be removed for pod %v", podObjectKey), + ) + } + + if !shouldRemove { + logger.V(1).Info("Skipping scheduling gate removal for pod", + "podObjectKey", podObjectKey, + "reason", reason) + skippedScheduleGatedPods = append(skippedScheduleGatedPods, p.Name) + continue } + + // Create task to remove the specific gate + pod := p // Capture for closure + task := utils.Task{ + Name: fmt.Sprintf("RemoveSchedulingGate-%s-%d", pod.Name, i), + Fn: func(ctx context.Context) error { + return r.removeSpecificSchedulingGate(ctx, logger, pod, gateName) + }, + } + tasks = append(tasks, task) } if len(tasks) > 0 { @@ -344,6 +377,71 @@ func (r _resource) isBasePodGangScheduled(ctx context.Context, logger logr.Logge return true, nil } +// checkWorkloadReadyForPodClique checks if the Workload associated with this PodClique is ready for scheduling. +// A Workload is considered ready when ALL of its PodGroups have sufficient scheduled replicas. +func (r _resource) checkWorkloadReadyForPodClique(ctx context.Context, logger logr.Logger, pclq *grovecorev1alpha1.PodClique) (bool, string, error) { + // Get the Workload name from the PodClique labels + workloadName, hasWorkloadLabel := pclq.GetLabels()[common.LabelPodGang] + if !hasWorkloadLabel { + // No Workload associated - should not happen for Workload API mode + logger.Info("PodClique has no Workload label", "podClique", client.ObjectKeyFromObject(pclq)) + return false, "", nil + } + + // Get the Workload object + workload := &schedulingv1alpha1.Workload{} + workloadKey := client.ObjectKey{Name: workloadName, Namespace: pclq.Namespace} + if err := r.client.Get(ctx, workloadKey, workload); err != nil { + if apierrors.IsNotFound(err) { + // Workload not found yet - return error to trigger requeue + // This can happen when PodClique is created before Workload + logger.V(1).Info("Workload not found yet, will requeue", "workloadName", workloadName) + return false, workloadName, groveerr.WrapError(err, + errCodeGetPodGang, // Reuse error code + component.OperationSync, + fmt.Sprintf("Workload %s not found yet for PodClique %s", workloadName, pclq.Name), + ) + } + return false, workloadName, groveerr.WrapError(err, + errCodeGetPodGang, // Reuse error code + component.OperationSync, + fmt.Sprintf("failed to get Workload %v", workloadKey), + ) + } + + // Check if all PodGroups in the Workload have sufficient scheduled replicas + for _, podGroup := range workload.Spec.PodGroups { + pclqName := podGroup.Name + podClique := &grovecorev1alpha1.PodClique{} + pclqKey := client.ObjectKey{Name: pclqName, Namespace: pclq.Namespace} + if err := r.client.Get(ctx, pclqKey, podClique); err != nil { + return false, workloadName, groveerr.WrapError(err, + errCodeGetPodClique, + component.OperationSync, + fmt.Sprintf("failed to get PodClique %s in namespace %s for Workload readiness check", pclqName, pclq.Namespace), + ) + } + + // For gang scheduling, check if MinCount is satisfied + // We check Replicas (total pods created) rather than ScheduledReplicas (pods that passed scheduling) + // because pods are gated and cannot be scheduled until we remove the gate + if podGroup.Policy.Gang != nil { + minCount := podGroup.Policy.Gang.MinCount + if podClique.Status.Replicas < minCount { + logger.V(1).Info("Workload not ready: PodClique has insufficient created pods", + "workloadName", workloadName, + "pclqName", pclqName, + "createdReplicas", podClique.Status.Replicas, + "minCount", minCount) + return false, workloadName, nil + } + } + } + + logger.Info("Workload is ready - all PodGroups meet gang scheduling requirements", "workloadName", workloadName) + return true, workloadName, nil +} + // checkBasePodGangScheduledForPodClique determines if there's a base PodGang for the PodClique. If there is one, // this function checks if it is scheduled. func (r _resource) checkBasePodGangScheduledForPodClique(ctx context.Context, logger logr.Logger, pclq *grovecorev1alpha1.PodClique) (bool, string, error) { @@ -362,10 +460,25 @@ func (r _resource) checkBasePodGangScheduledForPodClique(ctx context.Context, lo return scheduled, basePodGangName, nil } -// shouldSkipPodSchedulingGateRemoval implements the core PodGang scheduling gate logic. +// shouldSkipPodSchedulingGateRemoval implements the scheduling gate removal logic. // It returns true if the pod scheduling gate removal should be skipped, false otherwise. -func (r _resource) shouldSkipPodSchedulingGateRemoval(logger logr.Logger, pod *corev1.Pod, basePodGangReady bool, basePodGangName string) bool { - if basePodGangName == "" { +func (r _resource) shouldSkipPodSchedulingGateRemoval(logger logr.Logger, pod *corev1.Pod, gangReady bool, gangName string, usingWorkloadAPI bool) bool { + if usingWorkloadAPI { + // WORKLOAD API MODE: Simple logic - remove gate when Workload is ready + if gangReady { + logger.Info("Workload is ready, proceeding with gate removal", + "podObjectKey", client.ObjectKeyFromObject(pod), + "workloadName", gangName) + return false + } + logger.V(1).Info("Workload not ready yet, skipping gate removal", + "podObjectKey", client.ObjectKeyFromObject(pod), + "workloadName", gangName) + return true + } + + // PODGANG API MODE: Base/Scaled logic + if gangName == "" { // BASE PODGANG POD: This PodClique has no base PodGang dependency // These pods form the core gang and get their gates removed immediately once assigned to PodGang // They represent the minimum viable cluster (first minAvailable replicas) that must start together @@ -374,23 +487,42 @@ func (r _resource) shouldSkipPodSchedulingGateRemoval(logger logr.Logger, pod *c return false } // SCALED PODGANG POD: This PodClique depends on a base PodGang - if basePodGangReady { + if gangReady { logger.Info("Base PodGang is ready, proceeding with gate removal for scaled PodGang pod", "podObjectKey", client.ObjectKeyFromObject(pod), - "basePodGangName", basePodGangName) + "basePodGangName", gangName) return false } logger.Info("Scaled PodGang pod has scheduling gate but base PodGang is not ready yet, skipping scheduling gate removal", "podObjectKey", client.ObjectKeyFromObject(pod), - "basePodGangName", basePodGangName) + "basePodGangName", gangName) return true } -// hasPodGangSchedulingGate checks if a pod has the PodGang scheduling gate -func hasPodGangSchedulingGate(pod *corev1.Pod) bool { - return slices.ContainsFunc(pod.Spec.SchedulingGates, func(schedulingGate corev1.PodSchedulingGate) bool { - return podGangSchedulingGate == schedulingGate.Name +// hasSpecificSchedulingGate checks if a pod has a specific scheduling gate by name +func hasSpecificSchedulingGate(pod *corev1.Pod, gateName string) bool { + return slices.ContainsFunc(pod.Spec.SchedulingGates, func(g corev1.PodSchedulingGate) bool { + return g.Name == gateName + }) +} + +// removeSpecificSchedulingGate removes a specific scheduling gate from a pod +func (r _resource) removeSpecificSchedulingGate(ctx context.Context, logger logr.Logger, pod *corev1.Pod, gateName string) error { + podClone := pod.DeepCopy() + + // Remove the specific gate + pod.Spec.SchedulingGates = slices.DeleteFunc(pod.Spec.SchedulingGates, func(g corev1.PodSchedulingGate) bool { + return g.Name == gateName }) + + if err := client.IgnoreNotFound(r.client.Patch(ctx, pod, client.MergeFrom(podClone))); err != nil { + return err + } + + logger.Info("Removed scheduling gate from pod", + "podObjectKey", client.ObjectKeyFromObject(pod), + "gateName", gateName) + return nil } // createPods creates the specified number of new pods for the PodClique with proper indexing and concurrency control diff --git a/operator/internal/controller/podclique/register.go b/operator/internal/controller/podclique/register.go index 8571dd792..b9ac7b9da 100644 --- a/operator/internal/controller/podclique/register.go +++ b/operator/internal/controller/podclique/register.go @@ -30,6 +30,7 @@ import ( groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -54,10 +55,7 @@ func (r *Reconciler) RegisterWithManager(mgr ctrl.Manager) error { }). For(&grovecorev1alpha1.PodClique{}, builder.WithPredicates( - predicate.And( - predicate.GenerationChangedPredicate{}, - managedPodCliquePredicate(), - ), + managedPodCliquePredicate(), ), ). Owns(&corev1.Pod{}, builder.WithPredicates(podPredicate())). @@ -76,6 +74,11 @@ func (r *Reconciler) RegisterWithManager(mgr ctrl.Manager) error { handler.EnqueueRequestsFromMapFunc(mapPodGangToPCLQs()), builder.WithPredicates(podGangPredicate()), ). + Watches( + &schedulingv1alpha1.Workload{}, + handler.EnqueueRequestsFromMapFunc(mapWorkloadToPCLQs()), + builder.WithPredicates(workloadPredicate()), + ). Complete(r) } @@ -277,6 +280,35 @@ func podGangPredicate() predicate.Predicate { } } +// mapWorkloadToPCLQs maps a Workload to one or more reconcile.Request(s) for its constituent PodClique's. +func mapWorkloadToPCLQs() handler.MapFunc { + return func(_ context.Context, obj client.Object) []reconcile.Request { + workload, ok := obj.(*schedulingv1alpha1.Workload) + if !ok { + return nil + } + requests := make([]reconcile.Request, 0, len(workload.Spec.PodGroups)) + for _, podGroup := range workload.Spec.PodGroups { + // The PodGroup name is the PodClique FQN + pclqFQN := podGroup.Name + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: pclqFQN, Namespace: workload.Namespace}, + }) + } + return requests + } +} + +// workloadPredicate allows all Workload create and update events to trigger PodClique reconciliation +func workloadPredicate() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(_ event.CreateEvent) bool { return true }, + DeleteFunc: func(_ event.DeleteEvent) bool { return false }, + UpdateFunc: func(_ event.UpdateEvent) bool { return true }, + GenericFunc: func(_ event.GenericEvent) bool { return false }, + } +} + // isManagedPod checks if a Pod is managed by Grove and owned by a PodClique func isManagedPod(obj client.Object) bool { pod, ok := obj.(*corev1.Pod) diff --git a/operator/internal/controller/podcliqueset/components/podgang/podgang.go b/operator/internal/controller/podcliqueset/components/podgang/podgang.go index dd4a20691..5a0431162 100644 --- a/operator/internal/controller/podcliqueset/components/podgang/podgang.go +++ b/operator/internal/controller/podcliqueset/components/podgang/podgang.go @@ -121,7 +121,7 @@ func (r _resource) Delete(ctx context.Context, logger logr.Logger, pcsObjectMeta // buildResource configures a PodGang with pod groups and priority. func (r _resource) buildResource(pcs *grovecorev1alpha1.PodCliqueSet, pgInfo podGangInfo, pg *groveschedulerv1alpha1.PodGang) error { - pg.Labels = getLabels(pcs.Name) + pg.Labels = getLabels(pcs) if err := controllerutil.SetControllerReference(pcs, pg, r.scheme); err != nil { return groveerr.WrapError( err, @@ -155,10 +155,28 @@ func emptyPodGang(objKey client.ObjectKey) *groveschedulerv1alpha1.PodGang { } // getLabels constructs labels for a PodGang resource. -func getLabels(pcsName string) map[string]string { - return lo.Assign( - apicommon.GetDefaultLabelsForPodCliqueSetManagedResources(pcsName), +func getLabels(pcs *grovecorev1alpha1.PodCliqueSet) map[string]string { + labels := lo.Assign( + apicommon.GetDefaultLabelsForPodCliqueSetManagedResources(pcs.Name), map[string]string{ apicommon.LabelComponentKey: apicommon.LabelComponentNamePodGang, }) + + // Add scheduler-backend label so Backend Controllers can identify which PodGang to handle + // Check scheduler name from the first clique (all cliques should have same scheduler) + schedulerName := "" + if len(pcs.Spec.Template.Cliques) > 0 { + schedulerName = pcs.Spec.Template.Cliques[0].Spec.PodSpec.SchedulerName + } + + // Determine backend based on schedulerName + if schedulerName == "" || schedulerName == "default-scheduler" { + // Use Workload backend for default kube-scheduler + labels[apicommon.LabelSchedulerBackend] = "workload" + } else { + // Use KAI backend for custom schedulers + labels[apicommon.LabelSchedulerBackend] = "kai" + } + + return labels } diff --git a/operator/internal/controller/podcliqueset/components/registry.go b/operator/internal/controller/podcliqueset/components/registry.go index c58b1e8f7..41bba47ac 100644 --- a/operator/internal/controller/podcliqueset/components/registry.go +++ b/operator/internal/controller/podcliqueset/components/registry.go @@ -47,6 +47,8 @@ func CreateOperatorRegistry(mgr manager.Manager, eventRecorder record.EventRecor reg.Register(component.KindPodCliqueScalingGroup, podcliquescalinggroup.New(cl, mgr.GetScheme(), eventRecorder)) reg.Register(component.KindHorizontalPodAutoscaler, hpa.New(cl, mgr.GetScheme())) reg.Register(component.KindPodGang, podgang.New(cl, mgr.GetScheme(), eventRecorder)) + // NOTE: Workload is NOT registered here - it's managed by Backend Controllers + // Backend Controllers watch PodGang and convert to scheduler-specific CRs (Workload/PodGroup) reg.Register(component.KindPodCliqueSetReplica, podcliquesetreplica.New(cl, eventRecorder)) return reg } diff --git a/operator/internal/controller/podcliqueset/components/scheduler_detector.go b/operator/internal/controller/podcliqueset/components/scheduler_detector.go new file mode 100644 index 000000000..b916bcd18 --- /dev/null +++ b/operator/internal/controller/podcliqueset/components/scheduler_detector.go @@ -0,0 +1,56 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package components + +import ( + grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" +) + +// SchedulerType represents the type of scheduler being used +type SchedulerType string + +const ( + // SchedulerTypeDefault represents the default Kubernetes scheduler (kube-scheduler) + SchedulerTypeDefault SchedulerType = "default" + // SchedulerTypeGrove represents the Grove custom scheduler + SchedulerTypeGrove SchedulerType = "grove" +) + +// DetectSchedulerType determines which scheduler is being used for a PodCliqueSet. +// It checks the schedulerName field in PodSpec of all cliques: +// - If schedulerName is empty or "default-scheduler", it uses default kube-scheduler +// - Otherwise, it uses grove scheduler +func DetectSchedulerType(pcs *grovecorev1alpha1.PodCliqueSet) SchedulerType { + // Check all cliques for scheduler name + for _, clique := range pcs.Spec.Template.Cliques { + schedulerName := clique.Spec.PodSpec.SchedulerName + + // If any clique explicitly specifies a non-default scheduler, use grove scheduler + if schedulerName != "" && schedulerName != "default-scheduler" { + return SchedulerTypeGrove + } + } + + // Default to using default-scheduler (kube-scheduler 1.35+) + // This means we'll use the Workload API for gang scheduling + return SchedulerTypeDefault +} + +// ShouldUseWorkloadAPI returns true if we should use Kubernetes Workload API instead of PodGang API +func ShouldUseWorkloadAPI(pcs *grovecorev1alpha1.PodCliqueSet) bool { + return DetectSchedulerType(pcs) == SchedulerTypeDefault +} diff --git a/operator/internal/controller/podcliqueset/components/workload/syncflow.go b/operator/internal/controller/podcliqueset/components/workload/syncflow.go new file mode 100644 index 000000000..3252d3007 --- /dev/null +++ b/operator/internal/controller/podcliqueset/components/workload/syncflow.go @@ -0,0 +1,486 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package workload + +import ( + "context" + "errors" + "fmt" + "slices" + + apicommon "github.com/ai-dynamo/grove/operator/api/common" + grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" + "github.com/ai-dynamo/grove/operator/internal/constants" + "github.com/ai-dynamo/grove/operator/internal/controller/common/component" + componentutils "github.com/ai-dynamo/grove/operator/internal/controller/common/component/utils" + groveerr "github.com/ai-dynamo/grove/operator/internal/errors" + + "github.com/go-logr/logr" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// prepareSyncFlow computes the required state for synchronizing Workload resources. +func (r _resource) prepareSyncFlow(ctx context.Context, logger logr.Logger, pcs *grovecorev1alpha1.PodCliqueSet) (*syncContext, error) { + pcsObjectKey := client.ObjectKeyFromObject(pcs) + sc := &syncContext{ + ctx: ctx, + pcs: pcs, + logger: logger, + expectedWorkloads: make([]workloadInfo, 0), + existingWorkloadNames: make([]string, 0), + pclqs: make([]grovecorev1alpha1.PodClique, 0), + } + + pclqs, err := r.getPCLQsForPCS(ctx, pcsObjectKey) + if err != nil { + return nil, groveerr.WrapError(err, + errCodeListPodCliques, + component.OperationSync, + fmt.Sprintf("failed to list PodCliques for PodCliqueSet %v", pcsObjectKey), + ) + } + sc.pclqs = pclqs + + if err := r.computeExpectedWorkloads(sc); err != nil { + return nil, groveerr.WrapError(err, + errCodeComputeExistingWorkload, + component.OperationSync, + fmt.Sprintf("failed to compute existing Workloads for PodCliqueSet %v", pcsObjectKey), + ) + } + + existingWorkloadNames, err := r.GetExistingResourceNames(ctx, logger, pcs.ObjectMeta) + if err != nil { + return nil, groveerr.WrapError(err, + errCodeListWorkloads, + component.OperationSync, + fmt.Sprintf("Failed to get existing Workload names for PodCliqueSet: %v", client.ObjectKeyFromObject(sc.pcs)), + ) + } + sc.existingWorkloadNames = existingWorkloadNames + + return sc, nil +} + +// getPCLQsForPCS fetches all PodCliques managed by the PodCliqueSet. +func (r _resource) getPCLQsForPCS(ctx context.Context, pcsObjectKey client.ObjectKey) ([]grovecorev1alpha1.PodClique, error) { + pclqList := &grovecorev1alpha1.PodCliqueList{} + if err := r.client.List(ctx, pclqList, + client.InNamespace(pcsObjectKey.Namespace), + client.MatchingLabels(apicommon.GetDefaultLabelsForPodCliqueSetManagedResources(pcsObjectKey.Name))); err != nil { + return nil, err + } + return pclqList.Items, nil +} + +// computeExpectedWorkloads computes expected Workloads based on PCS replicas and scaling groups. +func (r _resource) computeExpectedWorkloads(sc *syncContext) error { + expectedWorkloads := make([]workloadInfo, 0, int(sc.pcs.Spec.Replicas)) + + // Create one workload per PodCliqueSet replica + for pcsReplica := range sc.pcs.Spec.Replicas { + workloadName := apicommon.GenerateBasePodGangName(apicommon.ResourceNameReplica{Name: sc.pcs.Name, Replica: int(pcsReplica)}) + expectedWorkloads = append(expectedWorkloads, workloadInfo{ + fqn: workloadName, + pclqs: identifyConstituentPCLQsForWorkload(sc, pcsReplica), + }) + + // Create scaled Workloads for PCSG replicas beyond minAvailable + scaledWorkloads, err := r.computeScaledPCSGWorkloads(sc, pcsReplica) + if err != nil { + return err + } + expectedWorkloads = append(expectedWorkloads, scaledWorkloads...) + } + + sc.expectedWorkloads = expectedWorkloads + return nil +} + +// identifyConstituentPCLQsForWorkload identifies PCLQs that belong to a Workload. +func identifyConstituentPCLQsForWorkload(sc *syncContext, pcsReplica int32) []pclqInfo { + constituentPCLQs := make([]pclqInfo, 0, len(sc.pcs.Spec.Template.Cliques)) + for _, pclqTemplateSpec := range sc.pcs.Spec.Template.Cliques { + // Check if this PodClique belongs to a scaling group + pcsgConfig := componentutils.FindScalingGroupConfigForClique(sc.pcs.Spec.Template.PodCliqueScalingGroupConfigs, pclqTemplateSpec.Name) + if pcsgConfig != nil { + // For scaling groups, include minAvailable replicas in base workload + scalingGroupPclqs := buildPCSGPodCliqueInfosForBaseWorkload(sc, pclqTemplateSpec, pcsgConfig, pcsReplica) + constituentPCLQs = append(constituentPCLQs, scalingGroupPclqs...) + } else { + // Add standalone PodClique (not part of a scaling group) + standalonePclq := buildNonPCSGPodCliqueInfosForBaseWorkload(sc, pclqTemplateSpec, pcsReplica) + constituentPCLQs = append(constituentPCLQs, standalonePclq) + } + } + return constituentPCLQs +} + +// buildPCSGPodCliqueInfosForBaseWorkload generates PodClique info for scaling group in a workload. +func buildPCSGPodCliqueInfosForBaseWorkload(sc *syncContext, pclqTemplateSpec *grovecorev1alpha1.PodCliqueTemplateSpec, + pcsgConfig *grovecorev1alpha1.PodCliqueScalingGroupConfig, pcsReplica int32) []pclqInfo { + + pclqInfos := make([]pclqInfo, 0, int(*pcsgConfig.MinAvailable)) + minAvailable := int(*pcsgConfig.MinAvailable) + + for pcsgReplica := 0; pcsgReplica < minAvailable; pcsgReplica++ { + // Generate PodClique name for scaling group member + pcsgFQN := apicommon.GeneratePodCliqueScalingGroupName( + apicommon.ResourceNameReplica{Name: sc.pcs.Name, Replica: int(pcsReplica)}, + pcsgConfig.Name, + ) + pclqFQN := fmt.Sprintf("%s-%d-%s", pcsgFQN, pcsgReplica, pclqTemplateSpec.Name) + + pclqInfos = append(pclqInfos, pclqInfo{ + fqn: pclqFQN, + minAvailable: pclqTemplateSpec.Spec.Replicas, + replicas: pclqTemplateSpec.Spec.Replicas, + }) + } + return pclqInfos +} + +// buildNonPCSGPodCliqueInfosForBaseWorkload generates PodClique info for standalone cliques. +func buildNonPCSGPodCliqueInfosForBaseWorkload(sc *syncContext, pclqTemplateSpec *grovecorev1alpha1.PodCliqueTemplateSpec, pcsReplica int32) pclqInfo { + pclqFQN := apicommon.GeneratePodCliqueName( + apicommon.ResourceNameReplica{Name: sc.pcs.Name, Replica: int(pcsReplica)}, + pclqTemplateSpec.Name, + ) + + minAvailable := pclqTemplateSpec.Spec.Replicas + if pclqTemplateSpec.Spec.MinAvailable != nil { + minAvailable = *pclqTemplateSpec.Spec.MinAvailable + } + + return pclqInfo{ + fqn: pclqFQN, + minAvailable: minAvailable, + replicas: pclqTemplateSpec.Spec.Replicas, + } +} + +// computeScaledPCSGWorkloads creates Workload resources for scaled PCSG replicas (beyond minAvailable). +// This mirrors the PodGang controller's logic for creating scaled PodGangs. +func (r _resource) computeScaledPCSGWorkloads(sc *syncContext, pcsReplica int32) ([]workloadInfo, error) { + scaledWorkloads := make([]workloadInfo, 0) + + // Get existing PCSGs to determine current replicas + existingPCSGs, err := r.getExistingPodCliqueScalingGroups(sc, pcsReplica) + if err != nil { + return nil, err + } + + // For each PCSG config, create scaled Workloads for replicas beyond minAvailable + for _, pcsgConfig := range sc.pcs.Spec.Template.PodCliqueScalingGroupConfigs { + pcsgFQN := apicommon.GeneratePodCliqueScalingGroupName( + apicommon.ResourceNameReplica{Name: sc.pcs.Name, Replica: int(pcsReplica)}, + pcsgConfig.Name, + ) + + minAvailable := int(*pcsgConfig.MinAvailable) + + // Determine current replicas from PCSG resource if it exists, otherwise use template + replicas := int(*pcsgConfig.Replicas) + pcsg, ok := lo.Find(existingPCSGs, func(sg grovecorev1alpha1.PodCliqueScalingGroup) bool { + return sg.Name == pcsgFQN + }) + if ok { + replicas = int(pcsg.Spec.Replicas) + } + + // Create scaled Workloads for replicas starting from minAvailable + // The first 0..(minAvailable-1) replicas are in the base Workload + // Scaled Workloads use 0-based indexing + scaledReplicas := replicas - minAvailable + for workloadIndex, pcsgReplica := 0, minAvailable; workloadIndex < scaledReplicas; workloadIndex, pcsgReplica = workloadIndex+1, pcsgReplica+1 { + workloadName := apicommon.CreatePodGangNameFromPCSGFQN(pcsgFQN, workloadIndex) + pclqs, err := identifyConstituentPCLQsForScaledPCSGWorkload(sc, pcsgFQN, pcsgReplica, pcsgConfig.CliqueNames) + if err != nil { + return nil, err + } + scaledWorkloads = append(scaledWorkloads, workloadInfo{ + fqn: workloadName, + pclqs: pclqs, + }) + } + } + + return scaledWorkloads, nil +} + +// getExistingPodCliqueScalingGroups fetches PCSGs for a specific PCS replica. +func (r _resource) getExistingPodCliqueScalingGroups(sc *syncContext, pcsReplica int32) ([]grovecorev1alpha1.PodCliqueScalingGroup, error) { + pcsgList := &grovecorev1alpha1.PodCliqueScalingGroupList{} + labels := lo.Assign( + apicommon.GetDefaultLabelsForPodCliqueSetManagedResources(sc.pcs.Name), + map[string]string{ + apicommon.LabelPodCliqueSetReplicaIndex: fmt.Sprintf("%d", pcsReplica), + }, + ) + if err := r.client.List(sc.ctx, pcsgList, + client.InNamespace(sc.pcs.Namespace), + client.MatchingLabels(labels)); err != nil { + return nil, groveerr.WrapError(err, + errCodeListPodCliqueScalingGroup, + component.OperationSync, + fmt.Sprintf("failed to list PodCliqueScalingGroups for PCS %v replica %d", client.ObjectKeyFromObject(sc.pcs), pcsReplica), + ) + } + return pcsgList.Items, nil +} + +// identifyConstituentPCLQsForScaledPCSGWorkload identifies PCLQs for a scaled PCSG Workload. +func identifyConstituentPCLQsForScaledPCSGWorkload(sc *syncContext, pcsgFQN string, pcsgReplica int, cliqueNames []string) ([]pclqInfo, error) { + constituentPCLQs := make([]pclqInfo, 0, len(cliqueNames)) + for _, pclqName := range cliqueNames { + pclqTemplate, ok := lo.Find(sc.pcs.Spec.Template.Cliques, func(pclqTemplateSpec *grovecorev1alpha1.PodCliqueTemplateSpec) bool { + return pclqName == pclqTemplateSpec.Name + }) + if !ok { + return nil, fmt.Errorf("PodCliqueScalingGroup references a PodClique that does not exist in the PodCliqueSet: %s", pclqName) + } + + pclqFQN := apicommon.GeneratePodCliqueName(apicommon.ResourceNameReplica{Name: pcsgFQN, Replica: pcsgReplica}, pclqName) + + minAvailable := pclqTemplate.Spec.Replicas + if pclqTemplate.Spec.MinAvailable != nil { + minAvailable = *pclqTemplate.Spec.MinAvailable + } + + constituentPCLQs = append(constituentPCLQs, pclqInfo{ + fqn: pclqFQN, + minAvailable: minAvailable, + replicas: pclqTemplate.Spec.Replicas, + }) + } + return constituentPCLQs, nil +} + +// runSyncFlow executes the sync flow for Workload resources. +func (r _resource) runSyncFlow(sc *syncContext) syncFlowResult { + result := syncFlowResult{} + + // Delete excess workloads + if err := r.deleteExcessWorkloads(sc); err != nil { + result.recordError(err) + return result + } + + // Create or update workloads + createUpdateResult := r.createOrUpdateWorkloads(sc) + result.merge(createUpdateResult) + + return result +} + +// deleteExcessWorkloads removes Workloads that are no longer expected. +func (r _resource) deleteExcessWorkloads(sc *syncContext) error { + expectedWorkloadNames := lo.Map(sc.expectedWorkloads, func(wl workloadInfo, _ int) string { + return wl.fqn + }) + + excessWorkloadNames := lo.Filter(sc.existingWorkloadNames, func(name string, _ int) bool { + return !slices.Contains(expectedWorkloadNames, name) + }) + + for _, workloadToDelete := range excessWorkloadNames { + wlObjectKey := client.ObjectKey{ + Namespace: sc.pcs.Namespace, + Name: workloadToDelete, + } + wl := emptyWorkload(wlObjectKey) + if err := r.client.Delete(sc.ctx, wl); err != nil { + r.eventRecorder.Eventf(sc.pcs, corev1.EventTypeWarning, constants.ReasonPodGangDeleteFailed, "Error Deleting Workload %v: %v", wlObjectKey, err) + return groveerr.WrapError(err, + errCodeDeleteExcessWorkload, + component.OperationSync, + fmt.Sprintf("failed to delete Workload %v", wlObjectKey), + ) + } + r.eventRecorder.Eventf(sc.pcs, corev1.EventTypeNormal, constants.ReasonPodGangDeleteSuccessful, "Deleted Workload %v", wlObjectKey) + sc.logger.Info("Triggered delete of excess Workload", "objectKey", client.ObjectKeyFromObject(wl)) + } + return nil +} + +// createOrUpdateWorkloads creates or updates all expected Workloads when ready. +func (r _resource) createOrUpdateWorkloads(sc *syncContext) syncFlowResult { + result := syncFlowResult{} + pendingWorkloadNames := sc.getWorkloadNamesPendingCreation() + + for _, workload := range sc.expectedWorkloads { + sc.logger.Info("[createOrUpdateWorkloads] processing Workload", "fqn", workload.fqn) + isWorkloadPendingCreation := slices.Contains(pendingWorkloadNames, workload.fqn) + + // Check if all pods for this workload have been created + numPendingPods := r.getPodsPendingCreation(sc, workload) + if isWorkloadPendingCreation && numPendingPods > 0 { + sc.logger.Info("skipping creation of Workload as all desired replicas have not yet been created", "fqn", workload.fqn, "numPendingPodsToCreate", numPendingPods) + result.recordWorkloadPendingCreation(workload.fqn) + continue + } + + if err := r.createOrUpdateWorkload(sc, workload); err != nil { + sc.logger.Error(err, "failed to create Workload", "WorkloadName", workload.fqn) + result.recordError(err) + return result + } + result.recordWorkloadCreation(workload.fqn) + } + return result +} + +// getPodsPendingCreation counts pods not yet created for the Workload. +func (r _resource) getPodsPendingCreation(sc *syncContext, workload workloadInfo) int { + var numPodsPending int + for _, pclq := range workload.pclqs { + // Check if PodClique exists + pclqExists := lo.ContainsBy(sc.pclqs, func(existingPclq grovecorev1alpha1.PodClique) bool { + return existingPclq.Name == pclq.fqn + }) + if !pclqExists { + // PodClique doesn't exist yet, count its expected pods as pending + numPodsPending += int(pclq.replicas) + continue + } + + // PodClique exists, check if it has created all its pods + // For Workload API, we only need to check if pods exist (not if they have specific labels) + // because pods associate with Workload through spec.workloadRef, not labels + foundPclq, _ := lo.Find(sc.pclqs, func(existingPclq grovecorev1alpha1.PodClique) bool { + return existingPclq.Name == pclq.fqn + }) + // Check the PodClique status to see how many pods have been created + numPodsPending += max(0, int(pclq.replicas)-int(foundPclq.Status.Replicas)) + } + return numPodsPending +} + +// createOrUpdateWorkload creates or updates a single Workload resource. +func (r _resource) createOrUpdateWorkload(sc *syncContext, wlInfo workloadInfo) error { + wlObjectKey := client.ObjectKey{ + Namespace: sc.pcs.Namespace, + Name: wlInfo.fqn, + } + wl := emptyWorkload(wlObjectKey) + sc.logger.Info("CreateOrPatch Workload", "objectKey", wlObjectKey) + _, err := controllerutil.CreateOrPatch(sc.ctx, r.client, wl, func() error { + return r.buildResource(sc.pcs, wlInfo, wl) + }) + if err != nil { + r.eventRecorder.Eventf(sc.pcs, corev1.EventTypeWarning, constants.ReasonPodGangCreateOrUpdateFailed, "Error Creating/Updating Workload %v: %v", wlObjectKey, err) + return groveerr.WrapError(err, + errCodeCreateOrPatchWorkload, + component.OperationSync, + fmt.Sprintf("Failed to CreateOrPatch Workload %v", wlObjectKey), + ) + } + r.eventRecorder.Eventf(sc.pcs, corev1.EventTypeNormal, constants.ReasonPodGangCreateOrUpdateSuccessful, "Created/Updated Workload %v", wlObjectKey) + sc.logger.Info("Triggered CreateOrPatch of Workload", "objectKey", wlObjectKey) + return nil +} + +// createPodGroupsForWorkload constructs PodGroups from constituent PodCliques for K8s Workload API. +func createPodGroupsForWorkload(wlInfo workloadInfo) []schedulingv1alpha1.PodGroup { + podGroups := lo.Map(wlInfo.pclqs, func(pclq pclqInfo, _ int) schedulingv1alpha1.PodGroup { + return schedulingv1alpha1.PodGroup{ + Name: pclq.fqn, + Policy: schedulingv1alpha1.PodGroupPolicy{ + Gang: &schedulingv1alpha1.GangSchedulingPolicy{ + MinCount: pclq.minAvailable, + }, + }, + } + }) + return podGroups +} + +// Convenience types and methods used during sync flow run. + +// syncContext holds the state required during the sync flow run. +type syncContext struct { + ctx context.Context + pcs *grovecorev1alpha1.PodCliqueSet + logger logr.Logger + expectedWorkloads []workloadInfo + existingWorkloadNames []string + pclqs []grovecorev1alpha1.PodClique +} + +// getWorkloadNamesPendingCreation returns names of Workloads that don't exist yet. +func (sc *syncContext) getWorkloadNamesPendingCreation() []string { + expectedWorkloadNames := lo.Map(sc.expectedWorkloads, func(wl workloadInfo, _ int) string { + return wl.fqn + }) + return lo.Filter(expectedWorkloadNames, func(name string, _ int) bool { + return !slices.Contains(sc.existingWorkloadNames, name) + }) +} + +// workloadInfo holds information about a workload to be created/updated. +type workloadInfo struct { + fqn string + pclqs []pclqInfo +} + +// pclqInfo holds information about a PodClique constituent of a workload. +type pclqInfo struct { + fqn string + minAvailable int32 + replicas int32 +} + +// syncFlowResult accumulates the results of a sync flow run. +type syncFlowResult struct { + errors []error + workloadsPendingCreation []string + createdWorkloads []string +} + +func (r *syncFlowResult) hasErrors() bool { + return len(r.errors) > 0 +} + +func (r *syncFlowResult) hasWorkloadsPendingCreation() bool { + return len(r.workloadsPendingCreation) > 0 +} + +func (r *syncFlowResult) getAggregatedError() error { + return errors.Join(r.errors...) +} + +func (r *syncFlowResult) recordError(err error) { + r.errors = append(r.errors, err) +} + +func (r *syncFlowResult) recordWorkloadPendingCreation(name string) { + r.workloadsPendingCreation = append(r.workloadsPendingCreation, name) +} + +func (r *syncFlowResult) recordWorkloadCreation(name string) { + r.createdWorkloads = append(r.createdWorkloads, name) +} + +func (r *syncFlowResult) merge(other syncFlowResult) { + r.errors = append(r.errors, other.errors...) + r.workloadsPendingCreation = append(r.workloadsPendingCreation, other.workloadsPendingCreation...) + r.createdWorkloads = append(r.createdWorkloads, other.createdWorkloads...) +} diff --git a/operator/internal/controller/podcliqueset/components/workload/workload.go b/operator/internal/controller/podcliqueset/components/workload/workload.go new file mode 100644 index 000000000..1ba794330 --- /dev/null +++ b/operator/internal/controller/podcliqueset/components/workload/workload.go @@ -0,0 +1,175 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package workload + +import ( + "context" + "fmt" + + apicommon "github.com/ai-dynamo/grove/operator/api/common" + grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" + "github.com/ai-dynamo/grove/operator/internal/controller/common/component" + componentutils "github.com/ai-dynamo/grove/operator/internal/controller/common/component/utils" + groveerr "github.com/ai-dynamo/grove/operator/internal/errors" + k8sutils "github.com/ai-dynamo/grove/operator/internal/utils/kubernetes" + + "github.com/go-logr/logr" + "github.com/samber/lo" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + errCodeListWorkloads grovecorev1alpha1.ErrorCode = "ERR_LIST_WORKLOADS" + errCodeDeleteWorkloads grovecorev1alpha1.ErrorCode = "ERR_DELETE_WORKLOADS" + errCodeDeleteExcessWorkload grovecorev1alpha1.ErrorCode = "ERR_DELETE_EXCESS_WORKLOAD" + errCodeListPods grovecorev1alpha1.ErrorCode = "ERR_LIST_PODS_FOR_PODCLIQUESET" + errCodeListPodCliques grovecorev1alpha1.ErrorCode = "ERR_LIST_PODCLIQUES_FOR_PODCLIQUESET" + errCodeListPodCliqueScalingGroup grovecorev1alpha1.ErrorCode = "ERR_LIST_PODCLIQUESCALINGGROUPS" + errCodeComputeExistingWorkload grovecorev1alpha1.ErrorCode = "ERR_COMPUTE_EXISTING_WORKLOAD" + errCodeSetControllerReference grovecorev1alpha1.ErrorCode = "ERR_SET_CONTROLLER_REFERENCE" + errCodeCreateOrPatchWorkload grovecorev1alpha1.ErrorCode = "ERR_CREATE_OR_PATCH_WORKLOAD" +) + +type _resource struct { + client client.Client + scheme *runtime.Scheme + eventRecorder record.EventRecorder +} + +// New creates a new instance of Workload components operator. +func New(client client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) component.Operator[grovecorev1alpha1.PodCliqueSet] { + return &_resource{ + client: client, + scheme: scheme, + eventRecorder: eventRecorder, + } +} + +// GetExistingResourceNames returns the names of existing Workload resources for the PodCliqueSet. +func (r _resource) GetExistingResourceNames(ctx context.Context, logger logr.Logger, pcsObjMeta metav1.ObjectMeta) ([]string, error) { + logger.Info("Looking for existing Workload resources created per replica of PodCliqueSet") + objMetaList := &metav1.PartialObjectMetadataList{} + objMetaList.SetGroupVersionKind(workloadGVK()) + if err := r.client.List(ctx, + objMetaList, + client.InNamespace(pcsObjMeta.Namespace), + client.MatchingLabels(componentutils.GetPodGangSelectorLabels(pcsObjMeta)), + ); err != nil { + return nil, groveerr.WrapError(err, + errCodeListWorkloads, + component.OperationGetExistingResourceNames, + fmt.Sprintf("Error listing Workload for PodCliqueSet: %v", k8sutils.GetObjectKeyFromObjectMeta(pcsObjMeta)), + ) + } + return k8sutils.FilterMapOwnedResourceNames(pcsObjMeta, objMetaList.Items), nil +} + +// workloadGVK returns the GroupVersionKind for Kubernetes Workload API (1.35+) +func workloadGVK() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: "scheduling.k8s.io", + Version: "v1alpha1", + Kind: "Workload", + } +} + +// Sync creates, updates, or deletes Workload resources to match the desired state. +func (r _resource) Sync(ctx context.Context, logger logr.Logger, pcs *grovecorev1alpha1.PodCliqueSet) error { + logger.Info("Syncing Workload resources") + sc, err := r.prepareSyncFlow(ctx, logger, pcs) + if err != nil { + return err + } + result := r.runSyncFlow(sc) + if result.hasErrors() { + return result.getAggregatedError() + } + if result.hasWorkloadsPendingCreation() { + return groveerr.New(groveerr.ErrCodeRequeueAfter, + component.OperationSync, + fmt.Sprintf("Workloads pending creation: %v", result.workloadsPendingCreation), + ) + } + return nil +} + +// Delete removes all Workload resources managed by the PodCliqueSet. +func (r _resource) Delete(ctx context.Context, logger logr.Logger, pcsObjectMeta metav1.ObjectMeta) error { + logger.Info("Triggering deletion of Workloads") + if err := r.client.DeleteAllOf(ctx, + &schedulingv1alpha1.Workload{}, + client.InNamespace(pcsObjectMeta.Namespace), + client.MatchingLabels(getWorkloadSelectorLabels(pcsObjectMeta))); err != nil { + return groveerr.WrapError(err, + errCodeDeleteWorkloads, + component.OperationDelete, + fmt.Sprintf("Failed to delete Workloads for PodCliqueSet: %v", k8sutils.GetObjectKeyFromObjectMeta(pcsObjectMeta)), + ) + } + logger.Info("Deleted Workloads") + return nil +} + +// buildResource configures a Workload with pod groups. +func (r _resource) buildResource(pcs *grovecorev1alpha1.PodCliqueSet, wlInfo workloadInfo, wl *schedulingv1alpha1.Workload) error { + wl.Labels = getLabels(pcs.Name) + if err := controllerutil.SetControllerReference(pcs, wl, r.scheme); err != nil { + return groveerr.WrapError( + err, + errCodeSetControllerReference, + component.OperationSync, + fmt.Sprintf("failed to set the controller reference on Workload %s to PodCliqueSet %v", wlInfo.fqn, client.ObjectKeyFromObject(pcs)), + ) + } + wl.Spec.PodGroups = createPodGroupsForWorkload(wlInfo) + return nil +} + +// getWorkloadSelectorLabels returns labels for selecting all Workloads of a PodCliqueSet. +func getWorkloadSelectorLabels(pcsObjMeta metav1.ObjectMeta) map[string]string { + return lo.Assign( + apicommon.GetDefaultLabelsForPodCliqueSetManagedResources(pcsObjMeta.Name), + map[string]string{ + apicommon.LabelComponentKey: apicommon.LabelComponentNamePodGang, + }) +} + +// emptyWorkload creates an empty Workload with only metadata set. +func emptyWorkload(objKey client.ObjectKey) *schedulingv1alpha1.Workload { + wl := &schedulingv1alpha1.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: objKey.Namespace, + Name: objKey.Name, + }, + } + return wl +} + +// getLabels constructs labels for a Workload resource. +func getLabels(pcsName string) map[string]string { + return lo.Assign( + apicommon.GetDefaultLabelsForPodCliqueSetManagedResources(pcsName), + map[string]string{ + apicommon.LabelComponentKey: apicommon.LabelComponentNamePodGang, + }) +} diff --git a/operator/internal/controller/podcliqueset/reconcilespec.go b/operator/internal/controller/podcliqueset/reconcilespec.go index 60ebbf3f3..a7c7b94c7 100644 --- a/operator/internal/controller/podcliqueset/reconcilespec.go +++ b/operator/internal/controller/podcliqueset/reconcilespec.go @@ -149,7 +149,7 @@ func (r *Reconciler) initRollingUpdateProgress(ctx context.Context, pcs *groveco // syncPodCliqueSetResources synchronizes all managed child resources in order. func (r *Reconciler) syncPodCliqueSetResources(ctx context.Context, logger logr.Logger, pcs *grovecorev1alpha1.PodCliqueSet) ctrlcommon.ReconcileStepResult { continueReconcileAndRequeueKinds := make([]component.Kind, 0) - for _, kind := range getOrderedKindsForSync() { + for _, kind := range getOrderedKindsForSync(pcs) { operator, err := r.operatorRegistry.GetOperator(kind) if err != nil { return ctrlcommon.ReconcileWithErrors(fmt.Sprintf("error getting operator for kind: %s", kind), err) @@ -199,8 +199,9 @@ func (r *Reconciler) recordIncompleteReconcile(ctx context.Context, logger logr. } // getOrderedKindsForSync returns the ordered list of component kinds to synchronize. -func getOrderedKindsForSync() []component.Kind { - return []component.Kind{ +// It dynamically selects between PodGang and Workload based on the scheduler type. +func getOrderedKindsForSync(pcs *grovecorev1alpha1.PodCliqueSet) []component.Kind { + baseKinds := []component.Kind{ component.KindServiceAccount, component.KindRole, component.KindRoleBinding, @@ -210,6 +211,9 @@ func getOrderedKindsForSync() []component.Kind { component.KindPodCliqueSetReplica, component.KindPodClique, component.KindPodCliqueScalingGroup, - component.KindPodGang, } + + // NEW ARCHITECTURE: Always create PodGang as the unified intermediate representation + // Backend Controllers will convert PodGang to scheduler-specific CRs (Workload/PodGroup) + return append(baseKinds, component.KindPodGang) } diff --git a/operator/internal/controller/register.go b/operator/internal/controller/register.go index 32106276c..b327f9e71 100644 --- a/operator/internal/controller/register.go +++ b/operator/internal/controller/register.go @@ -21,12 +21,23 @@ import ( "github.com/ai-dynamo/grove/operator/internal/controller/podclique" "github.com/ai-dynamo/grove/operator/internal/controller/podcliquescalinggroup" "github.com/ai-dynamo/grove/operator/internal/controller/podcliqueset" + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" + backendcontroller "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend/controller" + "github.com/go-logr/logr" ctrl "sigs.k8s.io/controller-runtime" + + _ "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend/kai" + _ "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend/workload" ) // RegisterControllers registers all controllers with the manager. -func RegisterControllers(mgr ctrl.Manager, controllerConfig configv1alpha1.ControllerConfiguration) error { +func RegisterControllers(mgr ctrl.Manager, logger logr.Logger, controllerConfig configv1alpha1.ControllerConfiguration) error { + // Initialize global backend manager first + // This must be done before any controller that uses backends + backend.InitializeGlobalManager(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("backend-manager")) + logger.Info("Initialized global backend manager") + pcsReconciler := podcliqueset.NewReconciler(mgr, controllerConfig.PodCliqueSet) if err := pcsReconciler.RegisterWithManager(mgr); err != nil { return err @@ -39,5 +50,11 @@ func RegisterControllers(mgr ctrl.Manager, controllerConfig configv1alpha1.Contr if err := pcsgReconciler.RegisterWithManager(mgr); err != nil { return err } + + // Setup backend controllers for PodGang -> scheduler-specific CR conversion + if err := backendcontroller.SetupBackendControllers(mgr, logger); err != nil { + return err + } + return nil } diff --git a/operator/internal/controller/scheduler/backend/controller/manager.go b/operator/internal/controller/scheduler/backend/controller/manager.go new file mode 100644 index 000000000..664de7fc4 --- /dev/null +++ b/operator/internal/controller/scheduler/backend/controller/manager.go @@ -0,0 +1,59 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package controller + +import ( + "fmt" + + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" + + "github.com/go-logr/logr" + ctrl "sigs.k8s.io/controller-runtime" +) + +// SetupBackendControllers sets up all backend controllers with the manager +// Each backend controller watches PodGang resources and converts them to scheduler-specific CRs +func SetupBackendControllers(mgr ctrl.Manager, logger logr.Logger) error { + logger.Info("Setting up backend controllers") + + // Get the global backend manager + backendManager, err := backend.GetGlobalManager() + if err != nil { + return fmt.Errorf("backend manager not initialized: %w", err) + } + + // Get all backends from the manager + backends := backendManager.GetAllBackends() + + // Setup a controller for each backend + for _, be := range backends { + reconciler := &BackendReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Backend: be, + Logger: logger.WithValues("backend", be.Name()), + } + + if err := reconciler.SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed to setup %s backend controller: %w", be.Name(), err) + } + logger.Info("Registered backend controller", "backend", be.Name()) + } + + logger.Info("Successfully set up all backend controllers") + return nil +} diff --git a/operator/internal/controller/scheduler/backend/controller/reconciler.go b/operator/internal/controller/scheduler/backend/controller/reconciler.go new file mode 100644 index 000000000..9e0bbd041 --- /dev/null +++ b/operator/internal/controller/scheduler/backend/controller/reconciler.go @@ -0,0 +1,115 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package controller + +import ( + "context" + "fmt" + + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" + + groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// BackendReconciler reconciles PodGang objects and converts them to scheduler-specific CRs +type BackendReconciler struct { + client.Client + Scheme *runtime.Scheme + Backend backend.SchedulerBackend + Logger logr.Logger +} + +// Reconcile processes PodGang changes and synchronizes to backend-specific CRs +func (r *BackendReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues("backend", r.Backend.Name(), "podgang", req.NamespacedName) + + // 1. Fetch the PodGang + podGang := &groveschedulerv1alpha1.PodGang{} + if err := r.Get(ctx, req.NamespacedName, podGang); err != nil { + if client.IgnoreNotFound(err) != nil { + logger.Error(err, "Failed to get PodGang") + return ctrl.Result{}, err + } + // PodGang was deleted, nothing to do (ownerReference handles cleanup) + logger.Info("PodGang not found, likely deleted") + return ctrl.Result{}, nil + } + + // 2. Check if this backend should handle this PodGang + if !r.Backend.Matches(podGang) { + logger.V(1).Info("PodGang does not match this backend, skipping") + return ctrl.Result{}, nil + } + + logger.Info("Processing PodGang with backend") + + // 3. Handle deletion + if !podGang.DeletionTimestamp.IsZero() { + logger.Info("PodGang is being deleted") + if err := r.Backend.Delete(ctx, logger, podGang); err != nil { + logger.Error(err, "Failed to delete backend resources") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + + // 4. Sync PodGang to backend-specific CR + if err := r.Backend.Sync(ctx, logger, podGang); err != nil { + logger.Error(err, "Failed to sync PodGang to backend") + return ctrl.Result{}, err + } + + logger.Info("Successfully synced PodGang to backend") + + // 5. Update PodGang status based on backend readiness + if err := r.updatePodGangStatus(ctx, logger, podGang); err != nil { + logger.Error(err, "Failed to update PodGang status") + // Don't fail the reconciliation for status update errors + } + + return ctrl.Result{}, nil +} + +// updatePodGangStatus updates PodGang status based on backend CR readiness +func (r *BackendReconciler) updatePodGangStatus(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error { + isReady, resourceName, err := r.Backend.CheckReady(ctx, logger, podGang) + if err != nil { + return fmt.Errorf("failed to check backend readiness: %w", err) + } + + // Update status if changed + if isReady { + logger.Info("Backend resource is ready", "resource", resourceName) + // TODO: Update PodGang.Status.Phase to Running + // TODO: Add condition for backend resource readiness + } + + return nil +} + +// SetupWithManager sets up the controller with the Manager +func (r *BackendReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&groveschedulerv1alpha1.PodGang{}). + Named(fmt.Sprintf("backend-%s", r.Backend.Name())). + Complete(r) +} diff --git a/operator/internal/controller/scheduler/backend/kai/backend.go b/operator/internal/controller/scheduler/backend/kai/backend.go new file mode 100644 index 000000000..8bb42eb5d --- /dev/null +++ b/operator/internal/controller/scheduler/backend/kai/backend.go @@ -0,0 +1,385 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package kai + +import ( + "context" + "fmt" + + "github.com/ai-dynamo/grove/operator/api/common" + grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" + + groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // BackendName is the name of the KAI backend + BackendName = "kai" + // SchedulingGateName is the name of the scheduling gate used by KAI + SchedulingGateName = "grove.io/podgang" + // BackendLabelValue is the label value to identify KAI backend + BackendLabelValue = "kai" +) + +// PodGroup API constants (run.ai format) +const ( + PodGroupAPIGroup = "scheduling.run.ai" + PodGroupAPIVersion = "v2alpha2" + PodGroupKind = "PodGroup" +) + +// Backend implements the SchedulerBackend interface for KAI scheduler +// Converts PodGang → PodGroup (scheduling.run.ai/v2alpha2 format, similar to posgroups.yaml) +type Backend struct { + client client.Client + scheme *runtime.Scheme + eventRecorder record.EventRecorder +} + +// Factory creates KAI backend instances +// New creates a new KAI backend instance +// This is exported for direct use by components that need a backend with client access +func New(cl client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) *Backend { + return &Backend{ + client: cl, + scheme: scheme, + eventRecorder: eventRecorder, + } +} + +// init registers the KAI backend factory +func init() { + // Register factory for kai-scheduler and grove-scheduler + factory := func(cl client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) backend.SchedulerBackend { + return New(cl, scheme, eventRecorder) + } + backend.RegisterBackendFactory("kai-scheduler", factory) + backend.RegisterBackendFactory("grove-scheduler", factory) +} + +// Name returns the backend name +func (b *Backend) Name() string { + return BackendName +} + +// Matches returns true if this PodGang should be handled by KAI backend +// Checks for label: grove.io/scheduler-backend: kai +func (b *Backend) Matches(podGang *groveschedulerv1alpha1.PodGang) bool { + if podGang.Labels == nil { + return false + } + return podGang.Labels["grove.io/scheduler-backend"] == BackendLabelValue +} + +// Sync converts PodGang to KAI PodGroup (similar to posgroups.yaml format) +func (b *Backend) Sync(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error { + logger.Info("Syncing PodGang to KAI PodGroup", "podGang", podGang.Name) + + // Convert PodGang to PodGroup + podGroup := b.convertPodGangToPodGroup(podGang) + + // Create or update PodGroup + existing := &unstructured.Unstructured{} + existing.SetGroupVersionKind(podGroupGVK()) + + err := b.client.Get(ctx, client.ObjectKey{ + Namespace: podGroup.GetNamespace(), + Name: podGroup.GetName(), + }, existing) + + if err != nil { + if client.IgnoreNotFound(err) != nil { + return fmt.Errorf("failed to get existing PodGroup: %w", err) + } + + // Create new PodGroup + logger.Info("Creating KAI PodGroup", "name", podGroup.GetName()) + if err := b.client.Create(ctx, podGroup); err != nil { + return fmt.Errorf("failed to create PodGroup: %w", err) + } + return nil + } + + // Update existing PodGroup + podGroup.SetResourceVersion(existing.GetResourceVersion()) + logger.Info("Updating KAI PodGroup", "name", podGroup.GetName()) + if err := b.client.Update(ctx, podGroup); err != nil { + return fmt.Errorf("failed to update PodGroup: %w", err) + } + + return nil +} + +// Delete removes the PodGroup owned by this PodGang +func (b *Backend) Delete(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error { + logger.Info("Deleting KAI PodGroup", "podGang", podGang.Name) + + podGroup := &unstructured.Unstructured{} + podGroup.SetGroupVersionKind(podGroupGVK()) + podGroup.SetName(b.getPodGroupName(podGang)) + podGroup.SetNamespace(podGang.Namespace) + + if err := b.client.Delete(ctx, podGroup); err != nil { + if client.IgnoreNotFound(err) == nil { + return nil // Already deleted + } + return fmt.Errorf("failed to delete PodGroup: %w", err) + } + + return nil +} + +// CheckReady checks if the PodGroup is ready for scheduling +func (b *Backend) CheckReady(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) (bool, string, error) { + podGroupName := b.getPodGroupName(podGang) + + podGroup := &unstructured.Unstructured{} + podGroup.SetGroupVersionKind(podGroupGVK()) + + err := b.client.Get(ctx, client.ObjectKey{ + Namespace: podGang.Namespace, + Name: podGroupName, + }, podGroup) + + if err != nil { + if client.IgnoreNotFound(err) == nil { + // PodGroup doesn't exist yet - not ready + return false, podGroupName, nil + } + return false, podGroupName, fmt.Errorf("failed to get PodGroup: %w", err) + } + + // Check PodGroup status + // In run.ai PodGroup, check status.schedulingConditions + conditions, found, _ := unstructured.NestedSlice(podGroup.Object, "status", "schedulingConditions") + if !found || len(conditions) == 0 { + return false, podGroupName, nil + } + + // Check if any condition indicates readiness + // For now, if PodGroup exists and has conditions, consider it being processed + isReady := false // TODO: Implement proper readiness check based on conditions + + logger.V(1).Info("Checked KAI PodGroup readiness", + "podGroupName", podGroupName, + "isReady", isReady) + + return isReady, podGroupName, nil +} + +// GetSchedulingGateName returns the scheduling gate name used by this backend +func (b *Backend) GetSchedulingGateName() string { + return SchedulingGateName +} + +// ShouldRemoveSchedulingGate checks if the PodGang is ready for this PodClique +// For KAI/Grove scheduler: +// - Base PodGang pods: remove gate immediately after pod is created +// - Scaled PodGang pods: remove gate when the base PodGang is scheduled +func (b *Backend) ShouldRemoveSchedulingGate(ctx context.Context, logger logr.Logger, pod *corev1.Pod, podCliqueObj client.Object) (bool, string, error) { + podClique, ok := podCliqueObj.(*grovecorev1alpha1.PodClique) + if !ok { + return false, "", fmt.Errorf("expected PodClique object, got %T", podCliqueObj) + } + + // Check if this PodClique has a base PodGang dependency + basePodGangName, hasBasePodGangLabel := podClique.GetLabels()[common.LabelBasePodGang] + if !hasBasePodGangLabel { + // This is a base PodGang pod - remove gate immediately + logger.Info("Proceeding with gate removal for base PodGang pod", + "podObjectKey", client.ObjectKeyFromObject(pod)) + return true, "base podgang pod", nil + } + + // This is a scaled PodGang pod - check if base PodGang is scheduled + basePodGang := &groveschedulerv1alpha1.PodGang{} + basePodGangKey := client.ObjectKey{Name: basePodGangName, Namespace: podClique.Namespace} + if err := b.client.Get(ctx, basePodGangKey, basePodGang); err != nil { + if apierrors.IsNotFound(err) { + logger.V(1).Info("Base PodGang not found yet", "basePodGangName", basePodGangName) + return false, "base podgang not found", nil + } + return false, "", fmt.Errorf("failed to get base PodGang %v: %w", basePodGangKey, err) + } + + // Check if all PodGroups in the base PodGang have sufficient scheduled replicas + for _, podGroup := range basePodGang.Spec.PodGroups { + pclqName := podGroup.Name + pclq := &grovecorev1alpha1.PodClique{} + pclqKey := client.ObjectKey{Name: pclqName, Namespace: podClique.Namespace} + if err := b.client.Get(ctx, pclqKey, pclq); err != nil { + return false, "", fmt.Errorf("failed to get PodClique %s for base PodGang readiness check: %w", pclqName, err) + } + + // Check if MinReplicas is satisfied (use ScheduledReplicas for base PodGang check) + if pclq.Status.ScheduledReplicas < podGroup.MinReplicas { + logger.Info("Scaled PodGang pod has scheduling gate but base PodGang is not ready yet", + "podObjectKey", client.ObjectKeyFromObject(pod), + "basePodGangName", basePodGangName, + "pclqName", pclqName, + "scheduledReplicas", pclq.Status.ScheduledReplicas, + "minReplicas", podGroup.MinReplicas) + return false, fmt.Sprintf("base podgang not ready: %s has %d/%d scheduled pods", pclqName, pclq.Status.ScheduledReplicas, podGroup.MinReplicas), nil + } + } + + logger.Info("Base PodGang is ready, proceeding with gate removal for scaled PodGang pod", + "podObjectKey", client.ObjectKeyFromObject(pod), + "basePodGangName", basePodGangName) + return true, "base podgang ready", nil +} + +// MutatePodSpec mutates the Pod spec to add KAI-specific requirements +// For KAI scheduler, we don't need WorkloadRef (KAI reads PodGang directly) +// But we can add helpful annotations for tracking +func (b *Backend) MutatePodSpec(pod *corev1.Pod, gangName string, podGroupName string) error { + // KAI scheduler consumes PodGang directly, so we don't need WorkloadRef + // We can add annotations for better observability + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + + // Add annotations to help track the gang membership + pod.Annotations["kai.scheduler/podgang"] = gangName + pod.Annotations["kai.scheduler/podgroup"] = podGroupName + + return nil +} + +// convertPodGangToPodGroup converts PodGang to KAI PodGroup (run.ai format) +// Format similar to posgroups.yaml +func (b *Backend) convertPodGangToPodGroup(podGang *groveschedulerv1alpha1.PodGang) *unstructured.Unstructured { + podGroup := &unstructured.Unstructured{} + podGroup.SetGroupVersionKind(podGroupGVK()) + podGroup.SetName(b.getPodGroupName(podGang)) + podGroup.SetNamespace(podGang.Namespace) + + // Set labels + labels := make(map[string]string) + for k, v := range podGang.Labels { + labels[k] = v + } + podGroup.SetLabels(labels) + + // Set owner reference + ownerRef := metav1.OwnerReference{ + APIVersion: groveschedulerv1alpha1.SchemeGroupVersion.String(), + Kind: "PodGang", + Name: podGang.Name, + UID: podGang.UID, + } + podGroup.SetOwnerReferences([]metav1.OwnerReference{ownerRef}) + + // Set annotations with top owner metadata (as in posgroups.yaml) + annotations := map[string]string{ + "kai.scheduler/top-owner-metadata": fmt.Sprintf(`name: %s +uid: %s +group: %s +version: %s +kind: %s`, + podGang.Name, + podGang.UID, + groveschedulerv1alpha1.SchemeGroupVersion.Group, + groveschedulerv1alpha1.SchemeGroupVersion.Version, + "PodGang", + ), + } + podGroup.SetAnnotations(annotations) + + // Build spec + spec := make(map[string]interface{}) + + // Calculate total minMember + var totalMinMember int32 + for _, pg := range podGang.Spec.PodGroups { + totalMinMember += pg.MinReplicas + } + spec["minMember"] = totalMinMember + + // Add priority class name if present + if podGang.Spec.PriorityClassName != "" { + spec["priorityClassName"] = podGang.Spec.PriorityClassName + } + + // Add queue (default to "default-queue" as in posgroups.yaml) + spec["queue"] = "default-queue" + + // Build subGroups from PodGang.Spec.PodGroups + subGroups := make([]interface{}, 0, len(podGang.Spec.PodGroups)) + for _, pg := range podGang.Spec.PodGroups { + subGroup := map[string]interface{}{ + "name": pg.Name, + "minMember": pg.MinReplicas, + } + subGroups = append(subGroups, subGroup) + } + spec["subGroups"] = subGroups + + // Add topology constraint if present + if podGang.Spec.TopologyConstraint != nil { + spec["topologyConstraint"] = convertTopologyConstraint(podGang.Spec.TopologyConstraint) + } else { + spec["topologyConstraint"] = map[string]interface{}{} + } + + unstructured.SetNestedMap(podGroup.Object, spec, "spec") + + return podGroup +} + +// convertTopologyConstraint converts Grove topology constraint to KAI format +func convertTopologyConstraint(tc *groveschedulerv1alpha1.TopologyConstraint) map[string]interface{} { + result := make(map[string]interface{}) + + if tc.PackConstraint != nil { + if tc.PackConstraint.Required != nil { + result["required"] = *tc.PackConstraint.Required + } + if tc.PackConstraint.Preferred != nil { + result["preferred"] = *tc.PackConstraint.Preferred + } + } + + return result +} + +// getPodGroupName generates PodGroup name from PodGang +// Format: pg-{podGangName}-{uid} (as in posgroups.yaml) +func (b *Backend) getPodGroupName(podGang *groveschedulerv1alpha1.PodGang) string { + return fmt.Sprintf("pg-%s-%s", podGang.Name, podGang.UID) +} + +// podGroupGVK returns the GroupVersionKind for KAI PodGroup +func podGroupGVK() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: PodGroupAPIGroup, + Version: PodGroupAPIVersion, + Kind: PodGroupKind, + } +} + +// NOTE: Registration is done in controller/manager.go to avoid circular imports diff --git a/operator/internal/controller/scheduler/backend/manager.go b/operator/internal/controller/scheduler/backend/manager.go new file mode 100644 index 000000000..9c99d8482 --- /dev/null +++ b/operator/internal/controller/scheduler/backend/manager.go @@ -0,0 +1,171 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package backend + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// BackendFactory creates a backend instance +type BackendFactory func(client client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) SchedulerBackend + +// BackendManager manages all backend instances as singletons +// This ensures we only create one backend instance per scheduler, shared across all components +type BackendManager struct { + client client.Client + scheme *runtime.Scheme + eventRecorder record.EventRecorder + backends map[string]SchedulerBackend + factories map[string]BackendFactory + mu sync.RWMutex +} + +var ( + globalManager *BackendManager + globalManagerOnce sync.Once + globalFactories = make(map[string]BackendFactory) + globalFactoriesMu sync.RWMutex +) + +// RegisterBackendFactory registers a backend factory for a scheduler name +// This should be called in backend package init() functions +func RegisterBackendFactory(schedulerName string, factory BackendFactory) { + globalFactoriesMu.Lock() + defer globalFactoriesMu.Unlock() + globalFactories[schedulerName] = factory +} + +// InitializeGlobalManager initializes the global backend manager +// This should be called once during operator startup, after the manager is created +func InitializeGlobalManager(client client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) { + globalManagerOnce.Do(func() { + globalFactoriesMu.RLock() + defer globalFactoriesMu.RUnlock() + + // Copy factories to manager + factories := make(map[string]BackendFactory, len(globalFactories)) + for name, factory := range globalFactories { + factories[name] = factory + } + + globalManager = &BackendManager{ + client: client, + scheme: scheme, + eventRecorder: eventRecorder, + backends: make(map[string]SchedulerBackend), + factories: factories, + } + }) +} + +// GetGlobalManager returns the global backend manager instance +// Returns error if not initialized +func GetGlobalManager() (*BackendManager, error) { + if globalManager == nil { + return nil, fmt.Errorf("backend manager not initialized, call InitializeGlobalManager first") + } + return globalManager, nil +} + +// GetBackend returns a backend for the given scheduler name +// Creates the backend on first access and caches it for subsequent calls +func (m *BackendManager) GetBackend(schedulerName string) (SchedulerBackend, error) { + // Normalize scheduler name + if schedulerName == "" { + schedulerName = "default-scheduler" + } + + // Try read lock first for performance + m.mu.RLock() + if backend, exists := m.backends[schedulerName]; exists { + m.mu.RUnlock() + return backend, nil + } + m.mu.RUnlock() + + // Need to create backend, acquire write lock + m.mu.Lock() + defer m.mu.Unlock() + + // Double-check in case another goroutine created it + if backend, exists := m.backends[schedulerName]; exists { + return backend, nil + } + + // Create backend using registered factory + factory, exists := m.factories[schedulerName] + if !exists { + return nil, fmt.Errorf("no backend factory registered for scheduler: %s", schedulerName) + } + + backend := factory(m.client, m.scheme, m.eventRecorder) + + // Cache for future use + m.backends[schedulerName] = backend + return backend, nil +} + +// GetAllBackends returns all backends that support the Matches() interface +// Used by Backend Controllers to get backends for PodGang reconciliation +func (m *BackendManager) GetAllBackends() []SchedulerBackend { + m.mu.RLock() + defer m.mu.RUnlock() + + // Return all unique backend types (not scheduler names) + // We want one instance per backend type, not per scheduler name + backendTypes := make(map[string]SchedulerBackend) + + // Ensure workload and kai backends are created + workload, _ := m.getOrCreateBackendLocked("default-scheduler") + if workload != nil { + backendTypes["workload"] = workload + } + + kai, _ := m.getOrCreateBackendLocked("kai-scheduler") + if kai != nil { + backendTypes["kai"] = kai + } + + // Convert to slice + backends := make([]SchedulerBackend, 0, len(backendTypes)) + for _, backend := range backendTypes { + backends = append(backends, backend) + } + + return backends +} + +// getOrCreateBackendLocked creates a backend without locking (assumes caller holds lock) +func (m *BackendManager) getOrCreateBackendLocked(schedulerName string) (SchedulerBackend, error) { + if backend, exists := m.backends[schedulerName]; exists { + return backend, nil + } + + factory, exists := m.factories[schedulerName] + if !exists { + return nil, fmt.Errorf("no backend factory registered for scheduler: %s", schedulerName) + } + + backend := factory(m.client, m.scheme, m.eventRecorder) + m.backends[schedulerName] = backend + return backend, nil +} diff --git a/operator/internal/controller/scheduler/backend/types.go b/operator/internal/controller/scheduler/backend/types.go new file mode 100644 index 000000000..ee4dcb674 --- /dev/null +++ b/operator/internal/controller/scheduler/backend/types.go @@ -0,0 +1,105 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package backend + +import ( + "context" + "fmt" + + groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// SchedulerBackend defines the interface that different scheduler backends must implement. +// +// Architecture V2: Two-Phase Approach +// Phase 1: Operator always creates PodGang (unified intermediate representation) +// Phase 2: Backend converts PodGang → scheduler-specific CR (PodGroup/Workload/etc) +// +// This design provides: +// - PodGang as single source of truth +// - Backend as pure converter/synchronizer +// - Better decoupling and observability +// +// Design inspired by: https://github.com/kubernetes/kubernetes/tree/master/pkg/scheduler/backend/queue +type SchedulerBackend interface { + // Name returns the unique name of this scheduler backend + Name() string + + // Matches returns true if this backend should handle the given PodGang + // Based on labels/annotations on PodGang (e.g., grove.io/scheduler-backend) + Matches(podGang *groveschedulerv1alpha1.PodGang) bool + + // Sync converts PodGang to scheduler-specific CR and synchronizes it + // The PodGang serves as the source of truth + Sync(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error + + // Delete removes scheduler-specific CRs owned by this PodGang + Delete(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error + + // CheckReady checks if the scheduler-specific CR is ready + // Returns: (isReady, resourceName, error) + CheckReady(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) (bool, string, error) + + // GetSchedulingGateName returns the name of the scheduling gate used by this backend + GetSchedulingGateName() string + + // MutatePodSpec mutates the Pod spec to add scheduler-specific requirements + // This method allows backends to inject fields like WorkloadRef, annotations, etc. + // Parameters: + // - pod: the Pod to mutate + // - gangName: the PodGang name this pod belongs to + // - podGroupName: the PodGroup name within the gang (e.g., PodClique name) + MutatePodSpec(pod *corev1.Pod, gangName string, podGroupName string) error + + // ShouldRemoveSchedulingGate checks if the scheduling gate should be removed from the given Pod + // Each backend implements its own logic to determine when it's safe to remove the gate + // Returns: + // - shouldRemove: true if the gate should be removed + // - reason: human-readable reason for the decision + // - error: any error encountered during the check + ShouldRemoveSchedulingGate(ctx context.Context, logger logr.Logger, pod *corev1.Pod, podClique client.Object) (bool, string, error) +} + +// MutatePod applies scheduler-specific mutations to a Pod +// This is the main entry point for the pod component +func MutatePod(pod *corev1.Pod, gangName string, podGroupName string) error { + // Get backend from global manager + manager, err := GetGlobalManager() + if err != nil { + return fmt.Errorf("backend manager not initialized: %w", err) + } + + backend, err := manager.GetBackend(pod.Spec.SchedulerName) + if err != nil { + return fmt.Errorf("failed to get backend for scheduler %s: %w", pod.Spec.SchedulerName, err) + } + + // Apply scheduling gate + pod.Spec.SchedulingGates = []corev1.PodSchedulingGate{ + {Name: backend.GetSchedulingGateName()}, + } + + // Apply backend-specific mutations + return backend.MutatePodSpec(pod, gangName, podGroupName) +} + +// Note: All PodGang-related types (PodGangSpec, PodGroup, TopologyConstraint, etc.) +// are defined in scheduler/api/core/v1alpha1/podgang.go +// This backend package uses those API types directly to avoid duplication diff --git a/operator/internal/controller/scheduler/backend/workload/backend.go b/operator/internal/controller/scheduler/backend/workload/backend.go new file mode 100644 index 000000000..65d4c1482 --- /dev/null +++ b/operator/internal/controller/scheduler/backend/workload/backend.go @@ -0,0 +1,306 @@ +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package workload + +import ( + "context" + "fmt" + + "github.com/ai-dynamo/grove/operator/api/common" + grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" + "github.com/ai-dynamo/grove/operator/internal/controller/scheduler/backend" + + groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // BackendName is the name of the Workload backend + BackendName = "workload" + // SchedulingGateName is the name of the scheduling gate used by Workload API + SchedulingGateName = "scheduling.k8s.io/workload" + // BackendLabelValue is the label value to identify Workload backend + // Must match the label set in podgang component: grove.io/scheduler-backend=workload + BackendLabelValue = "workload" +) + +// Backend implements the SchedulerBackend interface for default kube-scheduler using Workload API +// Converts PodGang → Workload (scheduling.k8s.io/v1alpha1) +type Backend struct { + client client.Client + scheme *runtime.Scheme + eventRecorder record.EventRecorder +} + +// Factory creates Workload backend instances +// New creates a new Workload backend instance +// This is exported for direct use by components that need a backend with client access +func New(cl client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) *Backend { + return &Backend{ + client: cl, + scheme: scheme, + eventRecorder: eventRecorder, + } +} + +// Name returns the backend name +func (b *Backend) Name() string { + return BackendName +} + +// Matches returns true if this PodGang should be handled by Workload backend +// Checks for label: grove.io/scheduler-backend: default +func (b *Backend) Matches(podGang *groveschedulerv1alpha1.PodGang) bool { + if podGang.Labels == nil { + return false + } + return podGang.Labels["grove.io/scheduler-backend"] == BackendLabelValue +} + +// Sync converts PodGang to Workload and synchronizes it +func (b *Backend) Sync(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error { + logger.Info("Converting PodGang to Workload", "podGang", podGang.Name) + + // Convert PodGang to Workload + workload := b.convertPodGangToWorkload(podGang) + + // Create or update Workload + existing := &schedulingv1alpha1.Workload{} + err := b.client.Get(ctx, client.ObjectKey{ + Namespace: workload.Namespace, + Name: workload.Name, + }, existing) + + if err != nil { + if client.IgnoreNotFound(err) != nil { + return fmt.Errorf("failed to get existing Workload: %w", err) + } + + // Create new Workload + logger.Info("Creating Workload", "name", workload.Name) + if err := b.client.Create(ctx, workload); err != nil { + return fmt.Errorf("failed to create Workload: %w", err) + } + return nil + } + + // Update existing Workload + workload.ResourceVersion = existing.ResourceVersion + logger.Info("Updating Workload", "name", workload.Name) + if err := b.client.Update(ctx, workload); err != nil { + return fmt.Errorf("failed to update Workload: %w", err) + } + + return nil +} + +// convertPodGangToWorkload converts PodGang to Workload +func (b *Backend) convertPodGangToWorkload(podGang *groveschedulerv1alpha1.PodGang) *schedulingv1alpha1.Workload { + workload := &schedulingv1alpha1.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGang.Name, + Namespace: podGang.Namespace, + }, + Spec: schedulingv1alpha1.WorkloadSpec{ + // Convert PodGang's PodGroups to Workload's PodGroups + PodGroups: b.convertPodGroups(podGang.Spec.PodGroups), + }, + } + + // Copy labels + if workload.Labels == nil { + workload.Labels = make(map[string]string) + } + for k, v := range podGang.Labels { + workload.Labels[k] = v + } + + // Set owner reference + ownerRef := metav1.OwnerReference{ + APIVersion: groveschedulerv1alpha1.SchemeGroupVersion.String(), + Kind: "PodGang", + Name: podGang.Name, + UID: podGang.UID, + } + workload.SetOwnerReferences([]metav1.OwnerReference{ownerRef}) + + return workload +} + +// convertPodGroups converts PodGang's PodGroups to Workload API PodGroups +func (b *Backend) convertPodGroups(podGangGroups []groveschedulerv1alpha1.PodGroup) []schedulingv1alpha1.PodGroup { + podGroups := make([]schedulingv1alpha1.PodGroup, 0, len(podGangGroups)) + + for _, pg := range podGangGroups { + podGroups = append(podGroups, schedulingv1alpha1.PodGroup{ + Name: pg.Name, + Policy: schedulingv1alpha1.PodGroupPolicy{ + Gang: &schedulingv1alpha1.GangSchedulingPolicy{ + MinCount: pg.MinReplicas, + }, + }, + }) + } + + return podGroups +} + +// Delete removes the Workload owned by this PodGang +func (b *Backend) Delete(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) error { + logger.Info("Deleting Workload", "podGang", podGang.Name) + + workload := &schedulingv1alpha1.Workload{} + workload.Name = podGang.Name + workload.Namespace = podGang.Namespace + + if err := b.client.Delete(ctx, workload); err != nil { + if client.IgnoreNotFound(err) == nil { + return nil // Already deleted + } + return fmt.Errorf("failed to delete Workload: %w", err) + } + + return nil +} + +// CheckReady checks if the Workload is ready for scheduling +func (b *Backend) CheckReady(ctx context.Context, logger logr.Logger, podGang *groveschedulerv1alpha1.PodGang) (bool, string, error) { + workloadName := podGang.Name + + workload := &schedulingv1alpha1.Workload{} + err := b.client.Get(ctx, client.ObjectKey{ + Namespace: podGang.Namespace, + Name: workloadName, + }, workload) + + if err != nil { + if client.IgnoreNotFound(err) == nil { + // Workload doesn't exist yet - not ready + return false, workloadName, nil + } + return false, workloadName, fmt.Errorf("failed to get Workload: %w", err) + } + + // Check if Workload is admitted (ready for scheduling) + // Note: Workload API in k8s.io/api/scheduling/v1alpha1 may have different status structure + // For now, consider it ready if it exists + isReady := false + // TODO: Update this when the actual Workload API structure is confirmed + + logger.V(1).Info("Checked Workload readiness", + "workloadName", workloadName, + "isReady", isReady) + + return isReady, workloadName, nil +} + +// GetSchedulingGateName returns the scheduling gate name used by this backend +func (b *Backend) GetSchedulingGateName() string { + return SchedulingGateName +} + +// ShouldRemoveSchedulingGate checks if the Workload is ready for this PodClique +// For Workload API: remove gate when ALL PodGroups in the Workload meet their MinCount requirements +func (b *Backend) ShouldRemoveSchedulingGate(ctx context.Context, logger logr.Logger, pod *corev1.Pod, podCliqueObj client.Object) (bool, string, error) { + podClique, ok := podCliqueObj.(*grovecorev1alpha1.PodClique) + if !ok { + return false, "", fmt.Errorf("expected PodClique object, got %T", podCliqueObj) + } + + // Get the Workload name from the PodClique labels + workloadName, hasWorkloadLabel := podClique.GetLabels()[common.LabelPodGang] + if !hasWorkloadLabel { + logger.Info("PodClique has no Workload label", "podClique", client.ObjectKeyFromObject(podClique)) + return false, "no workload label", nil + } + + // Get the Workload object + workload := &schedulingv1alpha1.Workload{} + workloadKey := client.ObjectKey{Name: workloadName, Namespace: podClique.Namespace} + if err := b.client.Get(ctx, workloadKey, workload); err != nil { + if apierrors.IsNotFound(err) { + logger.V(1).Info("Workload not found yet", "workloadName", workloadName) + return false, "workload not found", nil + } + return false, "", fmt.Errorf("failed to get Workload %v: %w", workloadKey, err) + } + + // Check if all PodGroups in the Workload have sufficient created replicas + for _, podGroup := range workload.Spec.PodGroups { + pclqName := podGroup.Name + pclq := &grovecorev1alpha1.PodClique{} + pclqKey := client.ObjectKey{Name: pclqName, Namespace: podClique.Namespace} + if err := b.client.Get(ctx, pclqKey, pclq); err != nil { + return false, "", fmt.Errorf("failed to get PodClique %s for Workload readiness check: %w", pclqName, err) + } + + // Check if MinCount is satisfied + if podGroup.Policy.Gang != nil { + minCount := podGroup.Policy.Gang.MinCount + if pclq.Status.Replicas < minCount { + logger.V(1).Info("Workload not ready: PodClique has insufficient created pods", + "workloadName", workloadName, + "pclqName", pclqName, + "createdReplicas", pclq.Status.Replicas, + "minCount", minCount) + return false, fmt.Sprintf("workload not ready: %s has %d/%d pods", pclqName, pclq.Status.Replicas, minCount), nil + } + } + } + + logger.Info("Workload is ready - proceeding with gate removal", + "workloadName", workloadName, + "podObjectKey", client.ObjectKeyFromObject(pod)) + return true, "workload ready", nil +} + +// MutatePodSpec mutates the Pod spec to add Workload-specific requirements +// For Workload API (K8s 1.35+), we need to set the WorkloadRef +func (b *Backend) MutatePodSpec(pod *corev1.Pod, gangName string, podGroupName string) error { + // Set WorkloadRef (required by Kubernetes Workload API) + pod.Spec.WorkloadRef = &corev1.WorkloadReference{ + Name: gangName, // Workload name (e.g., "simple1-0") + PodGroup: podGroupName, // PodGroup name within the Workload (e.g., "simple1-0-pca") + } + + return nil +} + +// workloadGVK returns the GroupVersionKind for Kubernetes Workload API +func workloadGVK() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: "scheduling.k8s.io", + Version: "v1alpha1", + Kind: "Workload", + } +} + +// init registers the Workload backend factory +func init() { + // Register factory for default-scheduler + backend.RegisterBackendFactory("default-scheduler", func(cl client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder) backend.SchedulerBackend { + return New(cl, scheme, eventRecorder) + }) +} diff --git a/scheduler/client/go.mod b/scheduler/client/go.mod index 43ca18d99..8f77b4cdc 100644 --- a/scheduler/client/go.mod +++ b/scheduler/client/go.mod @@ -28,11 +28,11 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/term v0.37.0 // indirect + golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.9.0 // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect diff --git a/scheduler/client/go.sum b/scheduler/client/go.sum index 91dde6d09..b7c8cc8fe 100644 --- a/scheduler/client/go.sum +++ b/scheduler/client/go.sum @@ -95,6 +95,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -105,12 +106,15 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=