Skip to content

Commit 1578a29

Browse files
authored
[FLINK-37470] Improve JobManager Deployment / Pod error handling
1 parent cda493e commit 1578a29

File tree

9 files changed

+407
-132
lines changed

9 files changed

+407
-132
lines changed

Diff for: flink-kubernetes-operator/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ under the License.
184184
</exclusions>
185185
</dependency>
186186

187+
<dependency>
188+
<groupId>io.fabric8</groupId>
189+
<artifactId>kube-api-test-client-inject</artifactId>
190+
<version>${fabric8.version}</version>
191+
<scope>test</scope>
192+
</dependency>
193+
187194
<dependency>
188195
<groupId>com.squareup.okhttp3</groupId>
189196
<artifactId>mockwebserver</artifactId>

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@
1717

1818
package org.apache.flink.kubernetes.operator.exception;
1919

20-
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
20+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
21+
22+
import io.fabric8.kubernetes.api.model.ContainerStatus;
2123
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
2224

25+
import java.util.Optional;
26+
import java.util.Set;
27+
2328
/** Exception to signal terminal deployment failure. */
2429
public class DeploymentFailedException extends RuntimeException {
2530

26-
public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
27-
public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
28-
public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull";
31+
public static final Set<String> CONTAINER_ERROR_REASONS =
32+
ImmutableSet.of(
33+
"CrashLoopBackOff",
34+
"ImagePullBackOff",
35+
"ErrImagePull",
36+
"RunContainerError",
37+
"CreateContainerConfigError",
38+
"OOMKilled");
2939

3040
private static final long serialVersionUID = -1070179896083579221L;
3141

@@ -36,11 +46,6 @@ public DeploymentFailedException(DeploymentCondition deployCondition) {
3646
this.reason = deployCondition.getReason();
3747
}
3848

39-
public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
40-
super(stateWaiting.getMessage());
41-
this.reason = stateWaiting.getReason();
42-
}
43-
4449
public DeploymentFailedException(String message, String reason) {
4550
super(message);
4651
this.reason = reason;
@@ -49,4 +54,24 @@ public DeploymentFailedException(String message, String reason) {
4954
public String getReason() {
5055
return reason;
5156
}
57+
58+
public static DeploymentFailedException forContainerStatus(ContainerStatus status) {
59+
var waiting = status.getState().getWaiting();
60+
var lastState = status.getLastState();
61+
String message = null;
62+
if ("CrashLoopBackOff".equals(waiting.getReason())
63+
&& lastState != null
64+
&& lastState.getTerminated() != null) {
65+
message =
66+
Optional.ofNullable(lastState.getTerminated().getMessage())
67+
.map(err -> "CrashLoop - " + err)
68+
.orElse(null);
69+
}
70+
71+
if (message == null) {
72+
message = waiting.getMessage();
73+
}
74+
return new DeploymentFailedException(
75+
String.format("[%s] %s", status.getName(), message), waiting.getReason());
76+
}
5277
}

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

+22-15
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
3131
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3232
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
33+
import org.apache.flink.kubernetes.operator.utils.EventUtils;
3334
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
3435

35-
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
3636
import io.fabric8.kubernetes.api.model.ContainerStatus;
3737
import io.fabric8.kubernetes.api.model.Pod;
3838
import io.fabric8.kubernetes.api.model.PodList;
@@ -45,7 +45,7 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Optional;
48-
import java.util.Set;
48+
import java.util.stream.Stream;
4949

5050
/** Base observer for session and application clusters. */
5151
public abstract class AbstractFlinkDeploymentObserver
@@ -134,7 +134,7 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
134134
try {
135135
checkFailedCreate(status);
136136
// checking the pod is expensive; only do it when the deployment isn't ready
137-
checkContainerBackoff(ctx);
137+
checkContainerErrors(ctx);
138138
} catch (DeploymentFailedException dfe) {
139139
// throw only when not already in error status to allow for spec update
140140
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
@@ -171,21 +171,28 @@ private void checkFailedCreate(DeploymentStatus status) {
171171
}
172172
}
173173

174-
private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> ctx) {
174+
private void checkContainerErrors(FlinkResourceContext<FlinkDeployment> ctx) {
175175
PodList jmPods =
176176
ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig());
177177
for (Pod pod : jmPods.getItems()) {
178-
for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
179-
ContainerStateWaiting csw = cs.getState().getWaiting();
180-
if (csw != null
181-
&& Set.of(
182-
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
183-
DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
184-
DeploymentFailedException.REASON_ERR_IMAGE_PULL)
185-
.contains(csw.getReason())) {
186-
throw new DeploymentFailedException(csw);
187-
}
188-
}
178+
var podStatus = pod.getStatus();
179+
Stream.concat(
180+
podStatus.getContainerStatuses().stream(),
181+
podStatus.getInitContainerStatuses().stream())
182+
.forEach(AbstractFlinkDeploymentObserver::checkContainerError);
183+
184+
// No obvious errors were found, check for volume mount issues
185+
EventUtils.checkForVolumeMountErrors(ctx.getKubernetesClient(), pod);
186+
}
187+
}
188+
189+
private static void checkContainerError(ContainerStatus cs) {
190+
if (cs.getState() == null || cs.getState().getWaiting() == null) {
191+
return;
192+
}
193+
if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains(
194+
cs.getState().getWaiting().getReason())) {
195+
throw DeploymentFailedException.forContainerStatus(cs);
189196
}
190197
}
191198

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java

+85-8
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
package org.apache.flink.kubernetes.operator.utils;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
22+
2023
import io.fabric8.kubernetes.api.model.Event;
2124
import io.fabric8.kubernetes.api.model.EventBuilder;
2225
import io.fabric8.kubernetes.api.model.HasMetadata;
2326
import io.fabric8.kubernetes.api.model.ObjectMeta;
24-
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
27+
import io.fabric8.kubernetes.api.model.ObjectReference;
28+
import io.fabric8.kubernetes.api.model.Pod;
29+
import io.fabric8.kubernetes.api.model.PodCondition;
2530
import io.fabric8.kubernetes.client.KubernetesClient;
2631
import io.fabric8.kubernetes.client.KubernetesClientException;
2732
import org.slf4j.Logger;
@@ -32,10 +37,14 @@
3237
import java.net.HttpURLConnection;
3338
import java.time.Duration;
3439
import java.time.Instant;
40+
import java.util.ArrayList;
41+
import java.util.List;
3542
import java.util.Map;
3643
import java.util.Optional;
3744
import java.util.function.Consumer;
45+
import java.util.function.Function;
3846
import java.util.function.Predicate;
47+
import java.util.stream.Collectors;
3948

4049
/**
4150
* The util to generate an event for the target resource. It is copied from
@@ -182,13 +191,7 @@ private static Event buildEvent(
182191
String eventName) {
183192
return new EventBuilder()
184193
.withApiVersion("v1")
185-
.withInvolvedObject(
186-
new ObjectReferenceBuilder()
187-
.withKind(target.getKind())
188-
.withUid(target.getMetadata().getUid())
189-
.withName(target.getMetadata().getName())
190-
.withNamespace(target.getMetadata().getNamespace())
191-
.build())
194+
.withInvolvedObject(getObjectReference(target))
192195
.withType(type.name())
193196
.withReason(reason)
194197
.withFirstTimestamp(Instant.now().toString())
@@ -235,4 +238,78 @@ private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Eve
235238
}
236239
return Optional.empty();
237240
}
241+
242+
private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
243+
var ref = getObjectReference(pod);
244+
245+
var eventList =
246+
client.v1()
247+
.events()
248+
.inNamespace(pod.getMetadata().getNamespace())
249+
.withInvolvedObject(ref)
250+
.list();
251+
252+
if (eventList == null) {
253+
return new ArrayList<>();
254+
}
255+
256+
var items = eventList.getItems();
257+
if (items == null) {
258+
return new ArrayList<>();
259+
}
260+
return items;
261+
}
262+
263+
@VisibleForTesting
264+
protected static ObjectReference getObjectReference(HasMetadata resource) {
265+
var ref = new ObjectReference();
266+
ref.setApiVersion(resource.getApiVersion());
267+
ref.setKind(resource.getKind());
268+
ref.setName(resource.getMetadata().getName());
269+
ref.setNamespace(resource.getMetadata().getNamespace());
270+
ref.setUid(resource.getMetadata().getUid());
271+
return ref;
272+
}
273+
274+
/**
275+
* Check that pod is stuck during volume mount stage and throw {@link DeploymentFailedException}
276+
* with the right reason message if that's the case.
277+
*
278+
* @param client Kubernetes client
279+
* @param pod Pod to be checked
280+
*/
281+
public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) {
282+
var conditions = pod.getStatus().getConditions();
283+
if (conditions == null) {
284+
return;
285+
}
286+
var conditionMap =
287+
conditions.stream()
288+
.collect(Collectors.toMap(PodCondition::getType, Function.identity()));
289+
290+
// We use PodReadyToStartContainers if available otherwise use Initialized, but it's only
291+
// there k8s 1.29+
292+
boolean failedInitialization =
293+
checkStatusWasAlways(
294+
pod,
295+
conditionMap.getOrDefault(
296+
"PodReadyToStartContainers", conditionMap.get("Initialized")),
297+
"False");
298+
299+
boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False");
300+
301+
if (notReady && failedInitialization) {
302+
getPodEvents(client, pod).stream()
303+
.filter(e -> e.getReason().equals("FailedMount"))
304+
.findAny()
305+
.ifPresent(
306+
e -> {
307+
throw new DeploymentFailedException(e.getMessage(), e.getReason());
308+
});
309+
}
310+
}
311+
312+
private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) {
313+
return condition != null && condition.getStatus().equals(status);
314+
}
238315
}

Diff for: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,24 @@ public class TestUtils extends BaseTestUtils {
9797
public static PodList createFailedPodList(String crashLoopMessage, String reason) {
9898
ContainerStatus cs =
9999
new ContainerStatusBuilder()
100+
.withName("c1")
101+
.withNewState()
102+
.withNewWaiting()
103+
.withReason(reason)
104+
.withMessage(crashLoopMessage)
105+
.endWaiting()
106+
.endState()
107+
.build();
108+
109+
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
110+
pod.setStatus(new PodStatusBuilder().withContainerStatuses(cs).build());
111+
return new PodListBuilder().withItems(pod).build();
112+
}
113+
114+
public static PodList createFailedInitContainerPodList(String crashLoopMessage, String reason) {
115+
ContainerStatus cs =
116+
new ContainerStatusBuilder()
117+
.withName("c1")
100118
.withNewState()
101119
.withNewWaiting()
102120
.withReason(reason)
@@ -108,7 +126,8 @@ public static PodList createFailedPodList(String crashLoopMessage, String reason
108126
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
109127
pod.setStatus(
110128
new PodStatusBuilder()
111-
.withContainerStatuses(Collections.singletonList(cs))
129+
.withContainerStatuses(new ContainerStatusBuilder().withReady().build())
130+
.withInitContainerStatuses(cs)
112131
.build());
113132
return new PodListBuilder().withItems(pod).build();
114133
}

0 commit comments

Comments
 (0)