diff --git a/Makefile b/Makefile index 4e8025a3..9f04c4d6 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,9 @@ FLINK_OPERATOR_NAMESPACE ?= flink-operator-system RESOURCE_PREFIX ?= flink-operator- # The Kubernetes namespace to limit watching. WATCH_NAMESPACE ?= +# Kind cluster name +KIND_CLUSTER ?= kind-flink-operator +KIND_IMAGE ?= kindest/node:v1.17.17 GREEN=\033[1;32m RED=\033[1;31m @@ -16,6 +19,28 @@ RESET=\033[0m #################### Local build and test #################### +# Setup kind cluster +kind-ensure: + kubectx kind-${KIND_CLUSTER} + +kind-setup: + kind create cluster --name ${KIND_CLUSTER} --image ${KIND_IMAGE} + +kind-up-image: operator-image + kind load docker-image ${IMG} --name ${KIND_CLUSTER} + +kind-teardown: + kind delete cluster --name ${KIND_CLUSTER} + +kind-deploy: kind-ensure + deploy + +kind-example: kind-ensure + kubectl apply -f config/samples/flinkoperator_v1beta1_flinkjobcluster.yaml + +kind-undeploy: kind-ensure + undeploy + # Build the flink-operator binary build: generate fmt vet CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o bin/flink-operator main.go @@ -153,3 +178,5 @@ undeploy: undeploy-controller undeploy-crd # Deploy the sample Flink clusters in the Kubernetes cluster samples: kubectl apply -f config/samples/ + +#################### Local build and test #################### diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index 533819a9..c2ed7808 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -412,6 +412,18 @@ type JobSpec struct { SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` } +// HPASpec defines properties of a HPA for the cluster. +type HPASpec struct { + // Taskmanager lower limit for the number of pods. + MinReplicas *int32 `json:"minReplicas,omitempty"` + + // Taskmanager upper limit for the number of pods. + MaxReplicas int32 `json:"maxReplicas"` + + // Taskmanager target average CPU utilization + TargetCPUUtilizationPercentage *int32 `json:"targetCPUUtilizationPercentage,omitempty"` +} + // FlinkClusterSpec defines the desired state of FlinkCluster type FlinkClusterSpec struct { // Flink image spec for the cluster's components. @@ -452,6 +464,9 @@ type FlinkClusterSpec struct { // Config for GCP. GCPConfig *GCPConfig `json:"gcpConfig,omitempty"` + // Config for HPA + HPA *HPASpec `json:"hpa,omitempty"` + // The logging configuration, which should have keys 'log4j-console.properties' and 'logback-console.xml'. // These will end up in the 'flink-config-volume' ConfigMap, which gets mounted at /opt/flink/conf. // If not provided, defaults that log to console only will be used. @@ -519,7 +534,10 @@ type FlinkClusterComponentsStatus struct { JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"` // The state of TaskManager StatefulSet. - TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet"` + TaskManagerStatefulSet TaskManagerStatefulSetStatus `json:"taskManagerStatefulSet"` + + // The state of the HPA + HPA *HPAStatus `json:"hpa,omitempty"` // The status of the job, available only when JobSpec is provided. Job *JobStatus `json:"job,omitempty"` @@ -577,6 +595,23 @@ type JobStatus struct { RestartCount int32 `json:"restartCount,omitempty"` } +type HPAStatus struct { + // The name of the Kubernetes HPS resource. + Name string `json:"name,omitempty"` + + // The state of the Kubernetes hpa. + State string `json:"state"` + + // The current number of replicas in the cluster seen by the hpa. + CurrentReplicas int32 `json:"currentReplicas"` + + // The desired number of replicas in the cluster seen by the hpa. + DesiredReplicas int32 `json:"desiredReplicas"` + + // The current average CPU utilization over all pods in the taskmanager statefulset + CurrentCPUUtilizationPercentage *int32 `json:"currentCPUUtilizationPercentage,omitempty"` +} + // SavepointStatus defines the status of savepoint progress type SavepointStatus struct { // The ID of the Flink job. @@ -625,6 +660,21 @@ type JobManagerServiceStatus struct { NodePort int32 `json:"nodePort,omitempty"` } +// TaskManagerStatefulSetStatus +type TaskManagerStatefulSetStatus struct { + // The name of the Kubernetes jobManager service. + Name string `json:"name"` + + // The state of the component. + State string `json:"state"` + + // The number of replicas in the tasks manager. + Replicas int32 `json:"replicas"` + + // The label for the tasks manager pods. + Selector string `json:"selector"` +} + // FlinkClusterStatus defines the observed state of FlinkCluster type FlinkClusterStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster @@ -667,6 +717,7 @@ type FlinkClusterStatus struct { // +kubebuilder:object:root=true // +kubebuilder:subresource:status +// +kubebuilder:subresource:scale:specpath=.spec.taskManager.replicas,statuspath=.status.components.taskManagerStatefulSet.replicas,selectorpath=.status.components.taskManagerStatefulSet.selector // FlinkCluster is the Schema for the flinkclusters API type FlinkCluster struct { diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 58d77248..f171be8b 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -21,6 +21,7 @@ limitations under the License. package v1beta1 import ( + autoscalingv1 "k8s.io/api/autoscaling/v1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -94,6 +95,11 @@ func (in *FlinkClusterComponentsStatus) DeepCopyInto(out *FlinkClusterComponents (*in).DeepCopyInto(*out) } out.TaskManagerStatefulSet = in.TaskManagerStatefulSet + if in.HPA != nil { + in, out := &in.HPA, &out.HPA + *out = new(HPAStatus) + (*in).DeepCopyInto(*out) + } if in.Job != nil { in, out := &in.Job, &out.Job *out = new(JobStatus) @@ -217,6 +223,11 @@ func (in *FlinkClusterSpec) DeepCopyInto(out *FlinkClusterSpec) { *out = new(GCPConfig) (*in).DeepCopyInto(*out) } + if in.HPA != nil { + in, out := &in.HPA, &out.HPA + *out = new(autoscalingv1.HorizontalPodAutoscalerSpec) + (*in).DeepCopyInto(*out) + } if in.LogConfig != nil { in, out := &in.LogConfig, &out.LogConfig *out = make(map[string]string, len(*in)) @@ -312,6 +323,26 @@ func (in *GCPServiceAccount) DeepCopy() *GCPServiceAccount { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HPAStatus) DeepCopyInto(out *HPAStatus) { + *out = *in + if in.CurrentCPUUtilizationPercentage != nil { + in, out := &in.CurrentCPUUtilizationPercentage, &out.CurrentCPUUtilizationPercentage + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HPAStatus. +func (in *HPAStatus) DeepCopy() *HPAStatus { + if in == nil { + return nil + } + out := new(HPAStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HadoopConfig) DeepCopyInto(out *HadoopConfig) { *out = *in @@ -494,6 +525,13 @@ func (in *JobManagerSpec) DeepCopyInto(out *JobManagerSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.VolumeClaimTemplates != nil { + in, out := &in.VolumeClaimTemplates, &out.VolumeClaimTemplates + *out = make([]v1.PersistentVolumeClaim, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.InitContainers != nil { in, out := &in.InitContainers, &out.InitContainers *out = make([]v1.Container, len(*in)) @@ -769,6 +807,13 @@ func (in *TaskManagerSpec) DeepCopyInto(out *TaskManagerSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.VolumeClaimTemplates != nil { + in, out := &in.VolumeClaimTemplates, &out.VolumeClaimTemplates + *out = make([]v1.PersistentVolumeClaim, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.InitContainers != nil { in, out := &in.InitContainers, &out.InitContainers *out = make([]v1.Container, len(*in)) @@ -828,6 +873,21 @@ func (in *TaskManagerSpec) DeepCopy() *TaskManagerSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TaskManagerStatefulSetStatus) DeepCopyInto(out *TaskManagerStatefulSetStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskManagerStatefulSetStatus. +func (in *TaskManagerStatefulSetStatus) DeepCopy() *TaskManagerStatefulSetStatus { + if in == nil { + return nil + } + out := new(TaskManagerStatefulSetStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Validator) DeepCopyInto(out *Validator) { *out = *in diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index 1ae4d64d..e87144bb 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -16,6 +16,10 @@ spec: singular: flinkcluster scope: Namespaced subresources: + scale: + labelSelectorPath: .status.components.taskManagerStatefulSet.selector + specReplicasPath: .spec.taskManager.replicas + statusReplicasPath: .status.components.taskManagerStatefulSet.replicas status: {} validation: openAPIV3Schema: @@ -134,6 +138,33 @@ spec: mountPath: type: string type: object + hpa: + properties: + maxReplicas: + format: int32 + type: integer + minReplicas: + format: int32 + type: integer + scaleTargetRef: + properties: + apiVersion: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + targetCPUUtilizationPercentage: + format: int32 + type: integer + required: + - maxReplicas + - scaleTargetRef + type: object image: properties: name: @@ -2659,6 +2690,123 @@ spec: type: string type: object type: array + volumeClaimTemplates: + items: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + status: + properties: + accessModes: + items: + type: string + type: array + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + conditions: + items: + properties: + lastProbeTime: + format: date-time + type: string + lastTransitionTime: + format: date-time + type: string + message: + type: string + reason: + type: string + status: + type: string + type: + type: string + required: + - status + - type + type: object + type: array + phase: + type: string + type: object + type: object + type: array volumeMounts: items: properties: @@ -4504,6 +4652,123 @@ spec: type: string type: object type: array + volumeClaimTemplates: + items: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + accessModes: + items: + type: string + type: array + dataSource: + properties: + apiGroup: + type: string + kind: + type: string + name: + type: string + required: + - kind + - name + type: object + resources: + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + selector: + properties: + matchExpressions: + items: + properties: + key: + type: string + operator: + type: string + values: + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + type: object + type: object + storageClassName: + type: string + volumeMode: + type: string + volumeName: + type: string + type: object + status: + properties: + accessModes: + items: + type: string + type: array + capacity: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + conditions: + items: + properties: + lastProbeTime: + format: date-time + type: string + lastTransitionTime: + format: date-time + type: string + message: + type: string + reason: + type: string + status: + type: string + type: + type: string + required: + - status + - type + type: object + type: array + phase: + type: string + type: object + type: object + type: array volumeMounts: items: properties: @@ -5132,6 +5397,26 @@ spec: type: integer components: properties: + HPA: + properties: + currentCPUUtilizationPercentage: + format: int32 + type: integer + currentReplicas: + format: int32 + type: integer + desiredReplicas: + format: int32 + type: integer + name: + type: string + state: + type: string + required: + - currentReplicas + - desiredReplicas + - state + type: object configMap: properties: name: @@ -5167,16 +5452,6 @@ spec: required: - state type: object - jobManagerStatefulSet: - properties: - name: - type: string - state: - type: string - required: - - name - - state - type: object jobManagerIngress: properties: name: @@ -5204,20 +5479,37 @@ spec: - name - state type: object + jobManagerStatefulSet: + properties: + name: + type: string + state: + type: string + required: + - name + - state + type: object taskManagerStatefulSet: properties: name: type: string + replicas: + format: int32 + type: integer + selector: + type: string state: type: string required: - name + - replicas + - selector - state type: object required: - configMap - - jobManagerStatefulSet - jobManagerService + - jobManagerStatefulSet - taskManagerStatefulSet type: object control: diff --git a/controllers/batchscheduler/volcano/volcano_test.go b/controllers/batchscheduler/volcano/volcano_test.go index 485f374e..099b7c97 100644 --- a/controllers/batchscheduler/volcano/volcano_test.go +++ b/controllers/batchscheduler/volcano/volcano_test.go @@ -43,7 +43,7 @@ func TestGetClusterResource(t *testing.T) { }, }, Spec: appsv1.StatefulSetSpec{ - Replicas: &jmRep, + Replicas: &jmRep, ServiceName: "flinkjobcluster-sample-jobmanager", Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -173,8 +173,8 @@ func TestGetClusterResource(t *testing.T) { }, }, Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - ServiceName: "flinkjobcluster-sample-taskmanager", + Replicas: &replicas, + ServiceName: "flinkjobcluster-sample-taskmanager", PodManagementPolicy: "Parallel", Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ diff --git a/controllers/flinkcluster_controller.go b/controllers/flinkcluster_controller.go index 79da226c..42719b97 100644 --- a/controllers/flinkcluster_controller.go +++ b/controllers/flinkcluster_controller.go @@ -27,6 +27,7 @@ import ( "github.com/googlecloudplatform/flink-operator/controllers/model" appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" @@ -93,6 +94,7 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager( Owns(&appsv1.StatefulSet{}). Owns(&corev1.Service{}). Owns(&batchv1.Job{}). + Owns(&autoscalingv1.HorizontalPodAutoscaler{}). Complete(reconciler) } diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index a79aa403..12a5cd93 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -18,6 +18,7 @@ package controllers import ( "fmt" + autoscalingv1 "k8s.io/api/autoscaling/v1" "math" "regexp" "sort" @@ -68,12 +69,13 @@ func getDesiredClusterState( return model.DesiredClusterState{} } return model.DesiredClusterState{ - ConfigMap: getDesiredConfigMap(cluster), + ConfigMap: getDesiredConfigMap(cluster), JmStatefulSet: getDesiredJobManagerStatefulSet(cluster), - JmService: getDesiredJobManagerService(cluster), - JmIngress: getDesiredJobManagerIngress(cluster), + JmService: getDesiredJobManagerService(cluster), + JmIngress: getDesiredJobManagerIngress(cluster), TmStatefulSet: getDesiredTaskManagerStatefulSet(cluster), - Job: getDesiredJob(observed), + Job: getDesiredJob(observed), + HPA: getDesiredHPA(observed), } } @@ -217,9 +219,9 @@ func getDesiredJobManagerStatefulSet( Labels: statefulSetLabels, }, Spec: appsv1.StatefulSetSpec{ - Replicas: jobManagerSpec.Replicas, - Selector: &metav1.LabelSelector{MatchLabels: podLabels}, - ServiceName: jobManagerStatefulSetName, + Replicas: jobManagerSpec.Replicas, + Selector: &metav1.LabelSelector{MatchLabels: podLabels}, + ServiceName: jobManagerStatefulSetName, VolumeClaimTemplates: jobManagerSpec.VolumeClaimTemplates, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -516,11 +518,11 @@ func getDesiredTaskManagerStatefulSet( Labels: statefulSetLabels, }, Spec: appsv1.StatefulSetSpec{ - Replicas: &taskManagerSpec.Replicas, - Selector: &metav1.LabelSelector{MatchLabels: podLabels}, - ServiceName: taskManagerStatefulSetName, + Replicas: &taskManagerSpec.Replicas, + Selector: &metav1.LabelSelector{MatchLabels: podLabels}, + ServiceName: taskManagerStatefulSetName, VolumeClaimTemplates: taskManagerSpec.VolumeClaimTemplates, - PodManagementPolicy: "Parallel", + PodManagementPolicy: "Parallel", Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podLabels, @@ -761,6 +763,37 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { return job } +// Gets the desired autoscaler spec from the cluster spec +func getDesiredHPA(observed *ObservedClusterState) *autoscalingv1.HorizontalPodAutoscaler { + + var flinkCluster = observed.cluster + var clusterNamespace = flinkCluster.ObjectMeta.Namespace + var clusterName = flinkCluster.ObjectMeta.Name + var jobName = getJobName(clusterName) + var labels = mergeLabels( + getClusterLabels(*flinkCluster), + getRevisionHashLabels(flinkCluster.Status)) + + return &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: clusterNamespace, + Name: jobName, + OwnerReferences: []metav1.OwnerReference{ + ToOwnerReference(flinkCluster)}, + Labels: labels, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + Kind: "FlinkCluster", + Name: clusterName, + }, + MinReplicas: observed.cluster.Spec.HPA.MinReplicas, + MaxReplicas: observed.cluster.Spec.HPA.MaxReplicas, + TargetCPUUtilizationPercentage: observed.cluster.Spec.HPA.TargetCPUUtilizationPercentage, + }, + } +} + // Decide from which savepoint Flink job should be restored when the job created, updated or restarted // // case 1) Restore job from the user provided savepoint diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index 78923e62..4bcae053 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -574,8 +574,8 @@ func TestGetDesiredClusterState(t *testing.T) { }, }, Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - ServiceName: "flinkjobcluster-sample-taskmanager", + Replicas: &replicas, + ServiceName: "flinkjobcluster-sample-taskmanager", PodManagementPolicy: "Parallel", Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ diff --git a/controllers/flinkcluster_observer.go b/controllers/flinkcluster_observer.go index 31c08fa8..37a17d8e 100644 --- a/controllers/flinkcluster_observer.go +++ b/controllers/flinkcluster_observer.go @@ -25,6 +25,7 @@ import ( "github.com/googlecloudplatform/flink-operator/controllers/flinkclient" "github.com/googlecloudplatform/flink-operator/controllers/history" appsv1 "k8s.io/api/apps/v1" + v1autoscaling "k8s.io/api/autoscaling/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" @@ -51,12 +52,13 @@ type ObservedClusterState struct { cluster *v1beta1.FlinkCluster revisions []*appsv1.ControllerRevision configMap *corev1.ConfigMap - jmStatefulSet *appsv1.StatefulSet + jmStatefulSet *appsv1.StatefulSet jmService *corev1.Service jmIngress *extensionsv1beta1.Ingress - tmStatefulSet *appsv1.StatefulSet + tmStatefulSet *appsv1.StatefulSet job *batchv1.Job jobPod *corev1.Pod + hpa *v1autoscaling.HorizontalPodAutoscaler flinkJobStatus FlinkJobStatus flinkJobSubmitLog *FlinkJobSubmitLog savepoint *flinkclient.SavepointStatus @@ -191,6 +193,21 @@ func (observer *ClusterStateObserver) observe( observed.tmStatefulSet = observedTmStatefulSet } + // (Optional) hpa + var observedHPA = new(v1autoscaling.HorizontalPodAutoscaler) + err = observer.observeHPA(observedHPA) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get hpa") + return err + } + log.Info("Observed hpa", "state", "nil") + observedHPA = nil + } else { + log.Info("Observed hpa", "state", *observedHPA) + observed.hpa = observedHPA + } + // (Optional) Savepoint. // Savepoint observe error do not affect deploy reconciliation loop. observer.observeSavepoint(observed) @@ -347,6 +364,22 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus( return } +func (observer *ClusterStateObserver) observeHPA(observed *v1autoscaling.HorizontalPodAutoscaler) error { + + var clusterNamespace = observer.request.Namespace + var clusterName = observer.request.Name + var tmStatefulSetName = getHPAName(clusterName) + + return observer.k8sClient.Get( + observer.context, + types.NamespacedName{ + Namespace: clusterNamespace, + Name: tmStatefulSetName, + }, + observed) + +} + func (observer *ClusterStateObserver) observeSavepoint(observed *ObservedClusterState) error { var log = observer.log diff --git a/controllers/flinkcluster_reconciler.go b/controllers/flinkcluster_reconciler.go index 9465b836..4db898a7 100644 --- a/controllers/flinkcluster_reconciler.go +++ b/controllers/flinkcluster_reconciler.go @@ -19,6 +19,7 @@ package controllers import ( "context" "fmt" + autoscalingv1 "k8s.io/api/autoscaling/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -110,6 +111,11 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { result, err := reconciler.reconcileJob() + err = reconciler.reconcileHPA() + if err != nil { + return ctrl.Result{}, err + } + return result, nil } @@ -345,6 +351,39 @@ func (reconciler *ClusterReconciler) reconcileJobManagerIngress() error { return nil } +func (reconciler *ClusterReconciler) reconcileHPA() error { + var desiredHPA = reconciler.desired.HPA + var observedHPA = reconciler.observed.hpa + + if desiredHPA != nil && observedHPA == nil { + return reconciler.createHPA(desiredHPA, "HPA") + } + + if desiredHPA != nil && observedHPA != nil { + if getUpdateState(reconciler.observed) == UpdateStateInProgress { + var err error + if *reconciler.observed.cluster.Spec.RecreateOnUpdate { + err = reconciler.deleteOldComponent(desiredHPA, observedHPA, "HPA") + } else { + err = reconciler.updateComponent(desiredHPA, "HPA") + } + if err != nil { + return err + } + return nil + } + reconciler.log.Info("HPA already exists, no action") + return nil + } + + if desiredHPA == nil && observedHPA != nil { + return reconciler.deleteHPA(observedHPA, "JobManager") + } + + + return nil +} + func (reconciler *ClusterReconciler) createIngress( ingress *extensionsv1beta1.Ingress, component string) error { var context = reconciler.context @@ -361,6 +400,23 @@ func (reconciler *ClusterReconciler) createIngress( return err } +func (reconciler *ClusterReconciler) createHPA( + hpa *autoscalingv1.HorizontalPodAutoscaler, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Creating HPA", "resource", *hpa) + var err = k8sClient.Create(context, hpa) + if err != nil { + log.Info("Failed to create HPA", "error", err) + } else { + log.Info("HPA created") + } + return err +} + + func (reconciler *ClusterReconciler) deleteIngress( ingress *extensionsv1beta1.Ingress, component string) error { var context = reconciler.context @@ -378,6 +434,23 @@ func (reconciler *ClusterReconciler) deleteIngress( return err } +func (reconciler *ClusterReconciler) deleteHPA( + hpa *autoscalingv1.HorizontalPodAutoscaler, component string) error { + var context = reconciler.context + var log = reconciler.log.WithValues("component", component) + var k8sClient = reconciler.k8sClient + + log.Info("Deleting HPA", "HPA", hpa) + var err = k8sClient.Delete(context, hpa) + err = client.IgnoreNotFound(err) + if err != nil { + log.Error(err, "Failed to delete HPA") + } else { + log.Info("HPA deleted") + } + return err +} + func (reconciler *ClusterReconciler) reconcileConfigMap() error { var desiredConfigMap = reconciler.desired.ConfigMap var observedConfigMap = reconciler.observed.configMap diff --git a/controllers/flinkcluster_updater.go b/controllers/flinkcluster_updater.go index 49706f7a..941726ac 100644 --- a/controllers/flinkcluster_updater.go +++ b/controllers/flinkcluster_updater.go @@ -24,6 +24,7 @@ import ( "encoding/json" "errors" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "reflect" "time" @@ -370,18 +371,28 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus( if !isComponentUpdated(observedTmStatefulSet, *observed.cluster) && isJobUpdating { recorded.Components.TaskManagerStatefulSet.DeepCopyInto(&status.Components.TaskManagerStatefulSet) status.Components.TaskManagerStatefulSet.State = v1beta1.ComponentStateUpdating + // TODO: should we also update the state of replicas and labels here } else if observedTmStatefulSet != nil { status.Components.TaskManagerStatefulSet.Name = observedTmStatefulSet.ObjectMeta.Name status.Components.TaskManagerStatefulSet.State = getStatefulSetState(observedTmStatefulSet) + status.Components.TaskManagerStatefulSet.Replicas = + observedTmStatefulSet.Status.Replicas + selector, err := metav1.LabelSelectorAsSelector(observedTmStatefulSet.Spec.Selector) + if err != nil { + updater.log.Error(errors.New("Failed to get task manager selector - hpa will not work."), "taskmanager status update") + } + status.Components.TaskManagerStatefulSet.Selector = + selector.String() + if status.Components.TaskManagerStatefulSet.State == v1beta1.ComponentStateReady { runningComponents++ } } else if recorded.Components.TaskManagerStatefulSet.Name != "" { status.Components.TaskManagerStatefulSet = - v1beta1.FlinkClusterComponentState{ + v1beta1.TaskManagerStatefulSetStatus{ Name: recorded.Components.TaskManagerStatefulSet.Name, State: v1beta1.ComponentStateDeleted, } @@ -399,6 +410,25 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus( jobStopped = true } + // (Optional) hpa + var observedHPA = observed.hpa + if !isComponentUpdated(observedHPA, *observed.cluster) && isJobUpdating { + status.Components.HPA = &v1beta1.HPAStatus{} + recorded.Components.HPA.DeepCopyInto(status.Components.HPA) + status.Components.JobManagerIngress.State = v1beta1.ComponentStateUpdating + } else if observedHPA != nil { + status.Components.HPA.CurrentReplicas = observed.hpa.Status.CurrentReplicas + status.Components.HPA.DesiredReplicas = observed.hpa.Status.DesiredReplicas + status.Components.HPA.CurrentCPUUtilizationPercentage = observed.hpa.Status.CurrentCPUUtilizationPercentage + } else if recorded.Components.HPA != nil && + recorded.Components.HPA.Name != "" { + status.Components.HPA = + &v1beta1.HPAStatus{ + Name: recorded.Components.JobManagerIngress.Name, + State: v1beta1.ComponentStateDeleted, + } + } + // Derive the new cluster state. switch recorded.State { case "", v1beta1.ClusterStateCreating: diff --git a/controllers/flinkcluster_updater_test.go b/controllers/flinkcluster_updater_test.go index 4bdd8ab4..41439632 100644 --- a/controllers/flinkcluster_updater_test.go +++ b/controllers/flinkcluster_updater_test.go @@ -60,7 +60,7 @@ func TestIsStatusChangedTrue(t *testing.T) { Name: "my-jobmanager", State: "NotReady", }, - TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{ + TaskManagerStatefulSet: v1beta1.TaskManagerStatefulSetStatus{ Name: "my-taskmanager", State: "NotReady", }, @@ -84,7 +84,7 @@ func TestIsStatusChangedTrue(t *testing.T) { Name: "my-jobmanager", State: "Ready", }, - TaskManagerStatefulSet: v1beta1.FlinkClusterComponentState{ + TaskManagerStatefulSet: v1beta1.TaskManagerStatefulSetStatus{ Name: "my-taskmanager", State: "Ready", }, diff --git a/controllers/flinkcluster_util.go b/controllers/flinkcluster_util.go index 90c44f99..76355f81 100644 --- a/controllers/flinkcluster_util.go +++ b/controllers/flinkcluster_util.go @@ -24,6 +24,7 @@ import ( "github.com/googlecloudplatform/flink-operator/controllers/history" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" @@ -107,6 +108,11 @@ func getTaskManagerStatefulSetName(clusterName string) string { return clusterName + "-taskmanager" } +// Gets hpa name +func getHPAName(clusterName string) string { + return clusterName + "-hpa" +} + // Gets Job name func getJobName(clusterName string) string { return clusterName + "-job-submitter" @@ -462,6 +468,13 @@ func isComponentUpdated(component runtime.Object, cluster v1beta1.FlinkCluster) } return true } + case *autoscalingv1.HorizontalPodAutoscaler: + if o == nil { + if cluster.Spec.HPA != nil { + return false + } + return true + } } var labels, err = meta.NewAccessor().Labels(component) diff --git a/controllers/flinkcluster_util_test.go b/controllers/flinkcluster_util_test.go index ee8df5d4..213e2cb2 100644 --- a/controllers/flinkcluster_util_test.go +++ b/controllers/flinkcluster_util_test.go @@ -408,8 +408,8 @@ func TestIsFlinkAPIReady(t *testing.T) { Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"}, }, configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, flinkJobStatus: FlinkJobStatus{flinkJobList: &flinkclient.JobStatusList{}}, } @@ -425,10 +425,10 @@ func TestIsFlinkAPIReady(t *testing.T) { }, Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"}, }, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } ready = isFlinkAPIReady(observed) assert.Equal(t, ready, false) @@ -442,9 +442,9 @@ func TestIsFlinkAPIReady(t *testing.T) { }, Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"}, }, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } ready = isFlinkAPIReady(observed) assert.Equal(t, ready, false) @@ -458,10 +458,10 @@ func TestIsFlinkAPIReady(t *testing.T) { }, Status: v1beta1.FlinkClusterStatus{NextRevision: "cluster-85dc8f749-2"}, }, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } ready = isFlinkAPIReady(observed) assert.Equal(t, ready, false) @@ -478,11 +478,11 @@ func TestGetUpdateState(t *testing.T) { Components: v1beta1.FlinkClusterComponentsStatus{Job: &v1beta1.JobStatus{State: v1beta1.JobStateRunning}}, CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"}, }, - job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } var state = getUpdateState(observed) assert.Equal(t, state, UpdateStatePreparing) @@ -497,7 +497,7 @@ func TestGetUpdateState(t *testing.T) { }, jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}}, } state = getUpdateState(observed) assert.Equal(t, state, UpdateStateInProgress) @@ -510,12 +510,12 @@ func TestGetUpdateState(t *testing.T) { }, Status: v1beta1.FlinkClusterStatus{CurrentRevision: "cluster-85dc8f749-2", NextRevision: "cluster-aa5e3a87z-3"}, }, - job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + job: &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + configMap: &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, jmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, tmStatefulSet: &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, - jmIngress: &extensionsv1beta1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, + jmIngress: &extensionsv1beta1.Ingress{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-aa5e3a87z"}}}, } state = getUpdateState(observed) assert.Equal(t, state, UpdateStateFinished) diff --git a/controllers/model/model.go b/controllers/model/model.go index 9faf6d39..2c693f8a 100644 --- a/controllers/model/model.go +++ b/controllers/model/model.go @@ -17,15 +17,17 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" ) // DesiredClusterState holds desired state of a cluster. type DesiredClusterState struct { JmStatefulSet *appsv1.StatefulSet - JmService *corev1.Service - JmIngress *extensionsv1beta1.Ingress + JmService *corev1.Service + JmIngress *extensionsv1beta1.Ingress TmStatefulSet *appsv1.StatefulSet - ConfigMap *corev1.ConfigMap - Job *batchv1.Job + ConfigMap *corev1.ConfigMap + Job *batchv1.Job + HPA *autoscalingv1.HorizontalPodAutoscaler }