Skip to content

Commit 57758bb

Browse files
committed
[FLINK-34524] Scale down JM deployment to 0 before deletion
1 parent b8f22bf commit 57758bb

File tree

12 files changed

+333
-185
lines changed

12 files changed

+333
-185
lines changed

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {
106106
var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext);
107107
try {
108108
observerFactory.getOrCreate(flinkApp).observe(ctx);
109-
} catch (DeploymentFailedException dfe) {
109+
} catch (Exception err) {
110110
// ignore during cleanup
111111
}
112112
statusRecorder.removeCachedStatus(flinkApp);

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ private void deleteJmThatNeverStarted(
139139
deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
140140
flinkService.deleteClusterDeployment(
141141
deployment.getMetadata(), deployment.getStatus(), deployConfig, false);
142-
flinkService.waitForClusterShutdown(deployConfig);
143-
LOG.info("Deleted jobmanager deployment that never started.");
142+
LOG.info("Deleted application cluster that never started.");
144143
}
145144

146145
@Override
@@ -170,10 +169,9 @@ public void deploy(
170169

171170
if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
172171
Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(status));
173-
LOG.info("Deleting deployment with terminated application before new deployment");
172+
LOG.info("Deleting cluster with terminated application before new deployment");
174173
flinkService.deleteClusterDeployment(
175174
relatedResource.getMetadata(), status, deployConfig, !requireHaMetadata);
176-
flinkService.waitForClusterShutdown(deployConfig);
177175
statusRecorder.patchAndCacheStatus(relatedResource, ctx.getKubernetesClient());
178176
}
179177

@@ -238,7 +236,6 @@ protected void cleanupAfterFailedJob(FlinkResourceContext<FlinkDeployment> ctx)
238236
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
239237
flinkService.deleteClusterDeployment(
240238
ctx.getResource().getMetadata(), ctx.getResource().getStatus(), conf, false);
241-
flinkService.waitForClusterShutdown(conf);
242239
}
243240

244241
// Workaround for https://issues.apache.org/jira/browse/FLINK-27569

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ private void deleteSessionCluster(FlinkResourceContext<FlinkDeployment> ctx) {
8585
ctx.getFlinkService()
8686
.deleteClusterDeployment(
8787
deployment.getMetadata(), deployment.getStatus(), conf, false);
88-
ctx.getFlinkService().waitForClusterShutdown(ctx.getObserveConfig());
8988
}
9089

9190
@Override

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

+79-65
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,12 @@
101101
import io.fabric8.kubernetes.api.model.DeletionPropagation;
102102
import io.fabric8.kubernetes.api.model.ObjectMeta;
103103
import io.fabric8.kubernetes.api.model.PodList;
104+
import io.fabric8.kubernetes.api.model.apps.Deployment;
104105
import io.fabric8.kubernetes.client.KubernetesClient;
105-
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
106+
import io.fabric8.kubernetes.client.KubernetesClientException;
107+
import io.fabric8.kubernetes.client.dsl.Resource;
108+
import io.fabric8.kubernetes.client.dsl.Waitable;
109+
import lombok.SneakyThrows;
106110
import org.apache.commons.lang3.ObjectUtils;
107111
import org.slf4j.Logger;
108112
import org.slf4j.LoggerFactory;
@@ -112,13 +116,15 @@
112116
import java.io.File;
113117
import java.io.FileOutputStream;
114118
import java.io.IOException;
119+
import java.net.HttpURLConnection;
115120
import java.net.InetSocketAddress;
116121
import java.net.MalformedURLException;
117122
import java.net.Socket;
118123
import java.net.SocketAddress;
119124
import java.net.URL;
120125
import java.nio.file.Files;
121126
import java.nio.file.Paths;
127+
import java.time.Duration;
122128
import java.util.Arrays;
123129
import java.util.Collection;
124130
import java.util.Collections;
@@ -127,6 +133,7 @@
127133
import java.util.Map;
128134
import java.util.Objects;
129135
import java.util.Optional;
136+
import java.util.concurrent.Callable;
130137
import java.util.concurrent.CompletableFuture;
131138
import java.util.concurrent.ExecutorService;
132139
import java.util.concurrent.TimeUnit;
@@ -399,7 +406,6 @@ protected void cancelJob(
399406
// Unless we leave the jm around after savepoint, we should wait until it has finished
400407
// shutting down
401408
if (deleteClusterAfterSavepoint || upgradeMode != UpgradeMode.SAVEPOINT) {
402-
waitForClusterShutdown(conf);
403409
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
404410
}
405411
}
@@ -755,14 +761,6 @@ public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
755761
return getJmPodList(namespace, clusterId);
756762
}
757763

758-
@Override
759-
public void waitForClusterShutdown(Configuration conf) {
760-
waitForClusterShutdown(
761-
conf.getString(KubernetesConfigOptions.NAMESPACE),
762-
conf.getString(KubernetesConfigOptions.CLUSTER_ID),
763-
operatorConfig.getFlinkShutdownClusterTimeout().toSeconds());
764-
}
765-
766764
@Override
767765
public RestClusterClient<String> getClusterClient(Configuration conf) throws Exception {
768766
final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
@@ -899,50 +897,20 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws
899897
}
900898
}
901899

902-
/** Returns a list of Kubernetes Deployment names for given cluster. */
903-
protected abstract List<String> getDeploymentNames(String namespace, String clusterId);
904-
905-
/** Wait until the FLink cluster has completely shut down. */
906-
protected void waitForClusterShutdown(
907-
String namespace, String clusterId, long shutdownTimeout) {
908-
long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
909-
LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout);
910-
911-
for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
912-
long deploymentTimeout = timeoutAt - System.currentTimeMillis();
913-
914-
if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) {
915-
LOG.error(
916-
"Failed to shut down cluster {} (deployment {}) in {} seconds, proceeding...",
917-
clusterId,
918-
deploymentName,
919-
shutdownTimeout);
920-
return;
921-
}
922-
}
923-
}
924-
925-
/** Wait until Deployment is removed, return false if timed out, otherwise return true. */
900+
/** Wait until Deployment is removed, return remaining timeout. */
926901
@VisibleForTesting
927-
boolean waitForDeploymentToBeRemoved(String namespace, String deploymentName, long timeout) {
928-
LOG.info(
929-
"Waiting for Deployment {} to shut down with {} seconds timeout...",
930-
deploymentName,
931-
timeout / 1000);
932-
933-
try {
934-
kubernetesClient
935-
.apps()
936-
.deployments()
937-
.inNamespace(namespace)
938-
.withName(deploymentName)
939-
.waitUntilCondition(Objects::isNull, timeout, TimeUnit.MILLISECONDS);
940-
941-
LOG.info("Deployment {} successfully shut down", deploymentName);
942-
} catch (KubernetesClientTimeoutException e) {
943-
return false;
944-
}
945-
return true;
902+
protected Duration deleteDeploymentBlocking(
903+
String name,
904+
Resource<Deployment> deployment,
905+
DeletionPropagation propagation,
906+
Duration timeout) {
907+
return deleteBlocking(
908+
String.format("Deleting %s Deployment", name),
909+
() -> {
910+
deployment.withPropagationPolicy(propagation).delete();
911+
return deployment;
912+
},
913+
timeout);
946914
}
947915

948916
private static List<JobStatusMessage> toJobStatusMessage(
@@ -1050,33 +1018,35 @@ public final void deleteClusterDeployment(
10501018
Configuration conf,
10511019
boolean deleteHaData) {
10521020

1021+
var namespace = meta.getNamespace();
1022+
var clusterId = meta.getName();
1023+
10531024
var deletionPropagation = operatorConfig.getDeletionPropagation();
10541025
LOG.info("Deleting cluster with {} propagation", deletionPropagation);
1055-
deleteClusterInternal(meta, conf, deleteHaData, deletionPropagation);
1026+
deleteClusterInternal(namespace, clusterId, conf, deletionPropagation);
1027+
if (deleteHaData) {
1028+
deleteHAData(namespace, clusterId, conf);
1029+
} else {
1030+
LOG.info("Keeping HA metadata for last-state restore");
1031+
}
10561032
updateStatusAfterClusterDeletion(status);
10571033
}
10581034

10591035
/**
1060-
* Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
1061-
* allows deleting the native kubernetes HA resources as well.
1036+
* Delete Flink kubernetes cluster by deleting the kubernetes resources directly.
10621037
*
1063-
* @param meta ObjectMeta of the deployment
1038+
* @param namespace Namespace
1039+
* @param clusterId ClusterId
10641040
* @param conf Configuration of the Flink application
1065-
* @param deleteHaData Flag to indicate whether k8s or Zookeeper HA metadata should be removed
1066-
* as well
10671041
* @param deletionPropagation Resource deletion propagation policy
10681042
*/
10691043
protected abstract void deleteClusterInternal(
1070-
ObjectMeta meta,
1044+
String namespace,
1045+
String clusterId,
10711046
Configuration conf,
1072-
boolean deleteHaData,
10731047
DeletionPropagation deletionPropagation);
10741048

10751049
protected void deleteHAData(String namespace, String clusterId, Configuration conf) {
1076-
// We need to wait for cluster shutdown otherwise HA data might be recreated
1077-
waitForClusterShutdown(
1078-
namespace, clusterId, operatorConfig.getFlinkShutdownClusterTimeout().toSeconds());
1079-
10801050
if (FlinkUtils.isKubernetesHAActivated(conf)) {
10811051
LOG.info("Deleting Kubernetes HA metadata");
10821052
FlinkUtils.deleteKubernetesHAMetadata(clusterId, namespace, kubernetesClient);
@@ -1134,4 +1104,48 @@ private Configuration getOperatorRestConfig(Configuration origConfig) throws IOE
11341104
});
11351105
return conf;
11361106
}
1107+
1108+
/**
1109+
* Generic blocking delete operation implementation for triggering and waiting for removal of
1110+
* the selected resources. By returning the remaining timeout we allow chaining multiple delete
1111+
* operations under a single timeout setting easily.
1112+
*
1113+
* @param operation Name of the operation for logging
1114+
* @param delete Call that should trigger the async deletion and return the resource to be
1115+
* watched
1116+
* @param timeout Timeout for the operation
1117+
* @return Remaining timeout after deletion.
1118+
*/
1119+
@SneakyThrows
1120+
protected static Duration deleteBlocking(
1121+
String operation, Callable<Waitable> delete, Duration timeout) {
1122+
LOG.info("{} with {} seconds timeout...", operation, timeout.toSeconds());
1123+
long start = System.currentTimeMillis();
1124+
1125+
Waitable deleted = null;
1126+
try {
1127+
deleted = delete.call();
1128+
} catch (KubernetesClientException kce) {
1129+
// During the deletion we need to throw other types of errors
1130+
if (kce.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
1131+
throw kce;
1132+
}
1133+
}
1134+
1135+
if (deleted != null) {
1136+
try {
1137+
deleted.waitUntilCondition(
1138+
Objects::isNull, timeout.toMillis(), TimeUnit.MILLISECONDS);
1139+
LOG.info("Completed {}", operation);
1140+
} catch (KubernetesClientException kce) {
1141+
// We completely ignore not found errors and simply log others
1142+
if (kce.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
1143+
LOG.warn("Error while " + operation, kce);
1144+
}
1145+
}
1146+
}
1147+
1148+
long elapsedMillis = System.currentTimeMillis() - start;
1149+
return Duration.ofMillis(Math.max(0, timeout.toMillis() - elapsedMillis));
1150+
}
11371151
}

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

-2
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ void triggerCheckpoint(
118118

119119
PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
120120

121-
void waitForClusterShutdown(Configuration conf);
122-
123121
ScalingResult scale(FlinkResourceContext<?> resourceContext, Configuration deployConfig)
124122
throws Exception;
125123

0 commit comments

Comments
 (0)