Skip to content
This repository was archived by the owner on Sep 2, 2022. It is now read-only.

feat: added affinity, nodeSelector, & tolerations to job spec #423

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,19 @@ type JobSpec struct {
// More info: https://kubernetes.io/docs/concepts/workloads/pods/init-containers/
InitContainers []corev1.Container `json:"initContainers,omitempty"`

// Selector which must match a node's labels for the TaskManager pod to be
// scheduled on that node.
// More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
NodeSelector map[string]string `json:"nodeSelector,omitempty"`

// Defines the taint toleration of the pod
// More info: https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`

// Defines the affinity of the pod
// More info: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity
Affinity *corev1.Affinity `json:"affinity,omitempty"`

// Restart policy when the job fails, "Never" or "FromSavepointOnFailure",
// default: "Never".
//
Expand Down
35 changes: 34 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 14 additions & 11 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func getDesiredClusterState(
return model.DesiredClusterState{}
}
return model.DesiredClusterState{
ConfigMap: getDesiredConfigMap(cluster),
ConfigMap: getDesiredConfigMap(cluster),
JmStatefulSet: getDesiredJobManagerStatefulSet(cluster),
JmService: getDesiredJobManagerService(cluster),
JmIngress: getDesiredJobManagerIngress(cluster),
JmService: getDesiredJobManagerService(cluster),
JmIngress: getDesiredJobManagerIngress(cluster),
TmStatefulSet: getDesiredTaskManagerStatefulSet(cluster),
Job: getDesiredJob(observed),
Job: getDesiredJob(observed),
}
}

Expand Down Expand Up @@ -217,9 +217,9 @@ func getDesiredJobManagerStatefulSet(
Labels: statefulSetLabels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: jobManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: jobManagerStatefulSetName,
Replicas: jobManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: jobManagerStatefulSetName,
VolumeClaimTemplates: jobManagerSpec.VolumeClaimTemplates,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -516,11 +516,11 @@ func getDesiredTaskManagerStatefulSet(
Labels: statefulSetLabels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &taskManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: taskManagerStatefulSetName,
Replicas: &taskManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: taskManagerStatefulSetName,
VolumeClaimTemplates: taskManagerSpec.VolumeClaimTemplates,
PodManagementPolicy: "Parallel",
PodManagementPolicy: "Parallel",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Expand Down Expand Up @@ -726,6 +726,9 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: volumes,
NodeSelector: jobSpec.NodeSelector,
Tolerations: jobSpec.Tolerations,
Affinity: jobSpec.Affinity,
ImagePullSecrets: imageSpec.PullSecrets,
SecurityContext: securityContext,
ServiceAccountName: getServiceAccountName(serviceAccount),
Expand Down
59 changes: 57 additions & 2 deletions controllers/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,57 @@ func TestGetDesiredClusterState(t *testing.T) {
Value: "toleration-value2",
},
}
var jobAffinity = *&corev1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "node-allow/flink-job",
Operator: v1.NodeSelectorOpNotIn,
Values: []string{"false"},
},
},
},
},
},
},
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"kafka-click-generator"},
},
},
},
Namespaces: []string{"default"},
TopologyKey: "kubernetes.io/hostname",
},
},
},
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "app",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"flink"},
},
},
},
Namespaces: []string{"default"},
TopologyKey: "kubernetes.io/hostname",
},
},
},
}
var userAndGroupId int64 = 9999
var securityContext = corev1.PodSecurityContext{
RunAsUser: &userAndGroupId,
Expand Down Expand Up @@ -162,6 +213,8 @@ func TestGetDesiredClusterState(t *testing.T) {
VolumeMounts: []corev1.VolumeMount{
{Name: "cache-volume", MountPath: "/cache"},
},
Tolerations: tolerations,
Affinity: &jobAffinity,
InitContainers: []corev1.Container{
{
Name: "gcs-downloader",
Expand Down Expand Up @@ -574,8 +627,8 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: "flinkjobcluster-sample-taskmanager",
Replicas: &replicas,
ServiceName: "flinkjobcluster-sample-taskmanager",
PodManagementPolicy: "Parallel",
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
Expand Down Expand Up @@ -877,6 +930,8 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
},
Tolerations: tolerations,
Affinity: &jobAffinity,
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: &userAndGroupId,
RunAsGroup: &userAndGroupId,
Expand Down
10 changes: 10 additions & 0 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ FlinkCluster
|__ volumes
|__ volumeMounts
|__ initContainers
|__ nodeSelector
|__ tolerations
|__ affinity
|__ restartPolicy
|__ cleanupPolicy
|__ afterJobSucceeds
Expand Down Expand Up @@ -270,6 +273,13 @@ FlinkCluster
* **noLoggingToStdout** (optional): No logging output to STDOUT, default: false.
* **initContainers** (optional): Init containers of the Job pod.
See [more info](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) about init containers.
* **nodeSelector** (optional): Selector which must match a node's labels for the Job pod to be scheduled on that node.
See [more info](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/)
* **tolerations** (optional): Allows the Job pod to run on a tainted node
in the cluster.
See [more info](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/)
* **affinity** (optional): Allows the Job pod to set node affinity & inter-pod affinity & anti-affinity in the cluster.
See [more info](hhttps://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity)
* **volumes** (optional): Volumes in the Job pod.
See [more info](https://kubernetes.io/docs/concepts/storage/volumes/) about volumes.
* **volumeMounts** (optional): Volume mounts in the Job containers. If there is no confilcts, these mounts will be
Expand Down