Skip to content
This repository was archived by the owner on Sep 19, 2022. It is now read-only.

Commit 261dd72

Browse files
johnugeorgek8s-ci-robot
authored andcommitted
Use podGroup instead of PDB in v1beta2 (#150)
1 parent fd773c0 commit 261dd72

File tree

40 files changed

+2134
-65
lines changed

40 files changed

+2134
-65
lines changed

Gopkg.lock

Lines changed: 15 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ required = [
1313
name = "github.com/golang/protobuf"
1414
version = "1.1.0"
1515

16+
[[constraint]]
17+
name = "github.com/kubernetes-sigs/kube-batch"
18+
version = "v0.3"
19+
20+
[[constraint]]
21+
name = "github.com/kubeflow/tf-operator"
22+
branch = "master"
23+
1624
[[constraint]]
1725
name = "github.com/sirupsen/logrus"
1826
version = "v1.0.4"

cmd/pytorch-operator.v1beta1/app/server.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"os"
2020
"time"
2121

22+
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
2223
log "github.com/sirupsen/logrus"
2324
"k8s.io/api/core/v1"
2425
crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@@ -87,7 +88,7 @@ func Run(opt *options.ServerOption) error {
8788
}
8889

8990
// Create clients.
90-
kubeClientSet, leaderElectionClientSet, pytorchJobClientSet, err := createClientSets(kcfg)
91+
kubeClientSet, leaderElectionClientSet, pytorchJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
9192
if err != nil {
9293
return err
9394
}
@@ -99,7 +100,7 @@ func Run(opt *options.ServerOption) error {
99100
unstructuredInformer := controller.NewUnstructuredPyTorchJobInformer(kcfg, opt.Namespace)
100101

101102
// Create pytorch controller.
102-
tc := controller.NewPyTorchController(unstructuredInformer, kubeClientSet, pytorchJobClientSet, kubeInformerFactory, pytorchJobInformerFactory, *opt)
103+
tc := controller.NewPyTorchController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, pytorchJobClientSet, kubeInformerFactory, pytorchJobInformerFactory, *opt)
103104

104105
// Start informer goroutines.
105106
go kubeInformerFactory.Start(stopCh)
@@ -154,32 +155,37 @@ func Run(opt *options.ServerOption) error {
154155
return nil
155156
}
156157

157-
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, jobclientset.Interface, error) {
158+
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, jobclientset.Interface, kubebatchclient.Interface, error) {
158159

159160
crdClient, err := crdclient.NewForConfig(config)
160161

161162
if err != nil {
162-
return nil, nil, nil, err
163+
return nil, nil, nil, nil, err
163164
}
164165

165166
checkCRDExists(crdClient, v1beta1.PytorchCRD)
166167

167168
kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "pytorch-operator"))
168169
if err != nil {
169-
return nil, nil, nil, err
170+
return nil, nil, nil, nil, err
170171
}
171172

172173
leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
173174
if err != nil {
174-
return nil, nil, nil, err
175+
return nil, nil, nil, nil, err
175176
}
176177

177178
jobClientSet, err := jobclientset.NewForConfig(config)
178179
if err != nil {
179-
return nil, nil, nil, err
180+
return nil, nil, nil, nil, err
180181
}
181182

182-
return kubeClientSet, leaderElectionClientSet, jobClientSet, nil
183+
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
184+
if err != nil {
185+
return nil, nil, nil, nil, err
186+
}
187+
188+
return kubeClientSet, leaderElectionClientSet, jobClientSet, kubeBatchClientSet, nil
183189
}
184190

185191
func checkCRDExists(clientset crdclient.Interface, crdName string) {

cmd/pytorch-operator.v1beta2/app/server.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"os"
2020
"time"
2121

22+
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
2223
log "github.com/sirupsen/logrus"
2324
"k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/api/errors"
@@ -87,7 +88,7 @@ func Run(opt *options.ServerOption) error {
8788
}
8889

8990
// Create clients.
90-
kubeClientSet, leaderElectionClientSet, pytorchJobClientSet, err := createClientSets(kcfg)
91+
kubeClientSet, leaderElectionClientSet, pytorchJobClientSet, kubeBatchClientSet, err := createClientSets(kcfg)
9192
if err != nil {
9293
return err
9394
}
@@ -102,7 +103,7 @@ func Run(opt *options.ServerOption) error {
102103
unstructuredInformer := controller.NewUnstructuredPyTorchJobInformer(kcfg, opt.Namespace)
103104

104105
// Create pytorch controller.
105-
tc := controller.NewPyTorchController(unstructuredInformer, kubeClientSet, pytorchJobClientSet, kubeInformerFactory, pytorchJobInformerFactory, *opt)
106+
tc := controller.NewPyTorchController(unstructuredInformer, kubeClientSet, kubeBatchClientSet, pytorchJobClientSet, kubeInformerFactory, pytorchJobInformerFactory, *opt)
106107

107108
// Start informer goroutines.
108109
go kubeInformerFactory.Start(stopCh)
@@ -157,24 +158,29 @@ func Run(opt *options.ServerOption) error {
157158
return nil
158159
}
159160

160-
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, jobclientset.Interface, error) {
161+
func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, jobclientset.Interface, kubebatchclient.Interface, error) {
161162

162163
kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "pytorch-operator"))
163164
if err != nil {
164-
return nil, nil, nil, err
165+
return nil, nil, nil, nil, err
165166
}
166167

167168
leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
168169
if err != nil {
169-
return nil, nil, nil, err
170+
return nil, nil, nil, nil, err
170171
}
171172

172173
jobClientSet, err := jobclientset.NewForConfig(config)
173174
if err != nil {
174-
return nil, nil, nil, err
175+
return nil, nil, nil, nil, err
175176
}
176177

177-
return kubeClientSet, leaderElectionClientSet, jobClientSet, nil
178+
kubeBatchClientSet, err := kubebatchclient.NewForConfig(restclientset.AddUserAgent(config, "kube-batch"))
179+
if err != nil {
180+
return nil, nil, nil, nil, err
181+
}
182+
183+
return kubeClientSet, leaderElectionClientSet, jobClientSet, kubeBatchClientSet, nil
178184
}
179185

180186
func checkCRDExists(clientset jobclientset.Interface, namespace string) bool {

manifests/podgroup.yaml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
apiVersion: apiextensions.k8s.io/v1beta1
2+
kind: CustomResourceDefinition
3+
metadata:
4+
name: podgroups.scheduling.incubator.k8s.io
5+
spec:
6+
group: scheduling.incubator.k8s.io
7+
names:
8+
kind: PodGroup
9+
plural: podgroups
10+
scope: Namespaced
11+
validation:
12+
openAPIV3Schema:
13+
properties:
14+
apiVersion:
15+
type: string
16+
kind:
17+
type: string
18+
metadata:
19+
type: object
20+
spec:
21+
properties:
22+
minMember:
23+
format: int32
24+
type: integer
25+
type: object
26+
status:
27+
properties:
28+
succeeded:
29+
format: int32
30+
type: integer
31+
failed:
32+
format: int32
33+
type: integer
34+
running:
35+
format: int32
36+
type: integer
37+
type: object
38+
type: object
39+
version: v1alpha1

pkg/controller.v1beta1/pytorch/controller.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"time"
2121

22+
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
2223
log "github.com/sirupsen/logrus"
2324
"k8s.io/api/core/v1"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -98,6 +99,7 @@ func NewPyTorchController(
9899
// This variable is for unstructured informer.
99100
jobInformer jobinformersv1beta1.PyTorchJobInformer,
100101
kubeClientSet kubeclientset.Interface,
102+
kubeBatchClientSet kubebatchclient.Interface,
101103
jobClientSet jobclientset.Interface,
102104
kubeInformerFactory kubeinformers.SharedInformerFactory,
103105
// This field is not used now but we keep it since it will be used
@@ -116,7 +118,7 @@ func NewPyTorchController(
116118
// Create base controller
117119
log.Info("Creating Job controller")
118120
jc := jobcontroller.NewJobController(pc, metav1.Duration{Duration: 15 * time.Second},
119-
option.EnableGangScheduling, kubeClientSet, kubeInformerFactory, v1beta1.Plural)
121+
option.EnableGangScheduling, kubeClientSet, kubeBatchClientSet, kubeInformerFactory, v1beta1.Plural)
120122
pc.JobController = jc
121123
// Set sync handler.
122124
pc.syncHandler = pc.syncPyTorchJob
@@ -303,9 +305,9 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) {
303305

304306
if pc.Config.EnableGangScheduling {
305307
minAvailableReplicas := getTotalReplicas(job)
306-
_, err := pc.SyncPdb(job, minAvailableReplicas)
308+
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
307309
if err != nil {
308-
logger.Warnf("Sync pdb %v: %v", job.Name, err)
310+
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
309311
}
310312
}
311313

@@ -364,12 +366,12 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta1.PyTorchJob) error
364366
}
365367

366368
if pc.Config.EnableGangScheduling {
367-
pc.Recorder.Event(job, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting pdb")
368-
if err := pc.DeletePdb(job); err != nil {
369-
pc.Recorder.Eventf(job, v1.EventTypeWarning, "FailedDeletePdb", "Error deleting: %v", err)
369+
pc.Recorder.Event(job, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup")
370+
if err := pc.DeletePodGroup(job); err != nil {
371+
pc.Recorder.Eventf(job, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
370372
return err
371373
} else {
372-
pc.Recorder.Eventf(job, v1.EventTypeNormal, "SuccessfulDeletePdb", "Deleted pdb: %v", job.Name)
374+
pc.Recorder.Eventf(job, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", job.Name)
373375

374376
}
375377
}
@@ -468,6 +470,10 @@ func (pc *PyTorchController) GetReplicaIndexLabelKey() string {
468470
return replicaIndexLabel
469471
}
470472

473+
func (pc *PyTorchController) GetJobRoleKey() string {
474+
return labelPyTorchJobRole
475+
}
476+
471477
func (pc *PyTorchController) ControllerName() string {
472478
return controllerName
473479
}

pkg/controller.v1beta1/pytorch/controller_test.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
"github.com/golang/protobuf/proto"
24+
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
2425
"k8s.io/api/core/v1"
2526
apiv1beta1 "k8s.io/api/policy/v1beta1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -48,6 +49,7 @@ var (
4849
func newPyTorchController(
4950
config *rest.Config,
5051
kubeClientSet kubeclientset.Interface,
52+
kubeBatchClientSet kubebatchclient.Interface,
5153
jobClientSet jobclientset.Interface,
5254
resyncPeriod controller.ResyncPeriodFunc,
5355
option options.ServerOption,
@@ -60,7 +62,7 @@ func newPyTorchController(
6062

6163
jobInformer := NewUnstructuredPyTorchJobInformer(config, metav1.NamespaceAll)
6264

63-
ctr := NewPyTorchController(jobInformer, kubeClientSet, jobClientSet, kubeInformerFactory, jobInformerFactory, option)
65+
ctr := NewPyTorchController(jobInformer, kubeClientSet, kubeBatchClientSet, jobClientSet, kubeInformerFactory, jobInformerFactory, option)
6466
ctr.PodControl = &controller.FakePodControl{}
6567
ctr.ServiceControl = &control.FakeServiceControl{}
6668
return ctr, kubeInformerFactory, jobInformerFactory
@@ -227,6 +229,15 @@ func TestNormalPath(t *testing.T) {
227229
},
228230
},
229231
)
232+
// Prepare the kube-batch clientset and controller for the test.
233+
kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{
234+
Host: "",
235+
ContentConfig: rest.ContentConfig{
236+
GroupVersion: &v1.SchemeGroupVersion,
237+
},
238+
},
239+
)
240+
230241
config := &rest.Config{
231242
Host: "",
232243
ContentConfig: rest.ContentConfig{
@@ -235,7 +246,7 @@ func TestNormalPath(t *testing.T) {
235246
}
236247
option := options.ServerOption{}
237248
jobClientSet := jobclientset.NewForConfigOrDie(config)
238-
ctr, kubeInformerFactory, _ := newPyTorchController(config, kubeClientSet, jobClientSet, controller.NoResyncPeriodFunc, option)
249+
ctr, kubeInformerFactory, _ := newPyTorchController(config, kubeClientSet, kubeBatchClientSet, jobClientSet, controller.NoResyncPeriodFunc, option)
239250
ctr.jobInformerSynced = testutil.AlwaysReady
240251
ctr.PodInformerSynced = testutil.AlwaysReady
241252
ctr.ServiceInformerSynced = testutil.AlwaysReady
@@ -347,14 +358,23 @@ func TestRun(t *testing.T) {
347358
},
348359
},
349360
)
361+
// Prepare the kube-batch clientset and controller for the test.
362+
kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{
363+
Host: "",
364+
ContentConfig: rest.ContentConfig{
365+
GroupVersion: &v1.SchemeGroupVersion,
366+
},
367+
},
368+
)
369+
350370
config := &rest.Config{
351371
Host: "",
352372
ContentConfig: rest.ContentConfig{
353373
GroupVersion: &v1beta1.SchemeGroupVersion,
354374
},
355375
}
356376
jobClientSet := jobclientset.NewForConfigOrDie(config)
357-
ctr, _, _ := newPyTorchController(config, kubeClientSet, jobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
377+
ctr, _, _ := newPyTorchController(config, kubeClientSet, kubeBatchClientSet, jobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{})
358378
ctr.jobInformerSynced = testutil.AlwaysReady
359379
ctr.PodInformerSynced = testutil.AlwaysReady
360380
ctr.ServiceInformerSynced = testutil.AlwaysReady
@@ -380,12 +400,21 @@ func TestSyncPdb(t *testing.T) {
380400
GroupVersion: &v1beta1.SchemeGroupVersion,
381401
},
382402
}
403+
// Prepare the kube-batch clientset and controller for the test.
404+
kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{
405+
Host: "",
406+
ContentConfig: rest.ContentConfig{
407+
GroupVersion: &v1.SchemeGroupVersion,
408+
},
409+
},
410+
)
411+
383412
jobClientSet := jobclientset.NewForConfigOrDie(config)
384413
kubeClientSet := fake.NewSimpleClientset()
385414
option := options.ServerOption{
386415
EnableGangScheduling: true,
387416
}
388-
ctr, _, _ := newPyTorchController(config, kubeClientSet, jobClientSet, controller.NoResyncPeriodFunc, option)
417+
ctr, _, _ := newPyTorchController(config, kubeClientSet, kubeBatchClientSet, jobClientSet, controller.NoResyncPeriodFunc, option)
389418

390419
type testCase struct {
391420
job *v1beta1.PyTorchJob

0 commit comments

Comments
 (0)