Skip to content

Commit 4293d58

Browse files
jiangzhogyfora
authored andcommitted
[FLINK-31860] FlinkDeployments never finalize when namespace is deleted
1 parent 4ec4b31 commit 4293d58

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

Diff for: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@
2323
import io.fabric8.kubernetes.api.model.ObjectMeta;
2424
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
2525
import io.fabric8.kubernetes.client.KubernetesClient;
26+
import io.fabric8.kubernetes.client.KubernetesClientException;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2629

2730
import javax.annotation.Nullable;
2831

32+
import java.net.HttpURLConnection;
2933
import java.time.Duration;
3034
import java.time.Instant;
3135
import java.util.Map;
36+
import java.util.Optional;
3237
import java.util.function.Consumer;
3338
import java.util.function.Predicate;
3439

@@ -37,6 +42,7 @@
3742
* https://github.com/EnMasseProject/enmasse/blob/master/k8s-api/src/main/java/io/enmasse/k8s/api/KubeEventLogger.java
3843
*/
3944
public class EventUtils {
45+
private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class);
4046

4147
public static String generateEventName(
4248
HasMetadata target,
@@ -108,7 +114,7 @@ public static boolean createIfNotExists(
108114
return false;
109115
} else {
110116
Event event = buildEvent(target, type, reason, message, component, eventName);
111-
eventListener.accept(client.resource(event).createOrReplace());
117+
createOrReplaceEvent(client, event).ifPresent(eventListener);
112118
return true;
113119
}
114120
}
@@ -139,7 +145,7 @@ public static boolean createOrUpdateEventWithLabels(
139145
} else {
140146
Event event = buildEvent(target, type, reason, message, component, eventName);
141147
setLabels(event, labels);
142-
eventListener.accept(client.resource(event).createOrReplace());
148+
createOrReplaceEvent(client, event).ifPresent(eventListener);
143149
return true;
144150
}
145151
}
@@ -154,7 +160,7 @@ private static void updatedEventWithLabels(
154160
existing.setCount(existing.getCount() + 1);
155161
existing.setMessage(message);
156162
setLabels(existing, labels);
157-
eventListener.accept(client.resource(existing).createOrReplace());
163+
createOrReplaceEvent(client, existing).ifPresent(eventListener);
158164
}
159165

160166
private static void setLabels(Event existing, @Nullable Map<String, String> labels) {
@@ -213,4 +219,20 @@ private static boolean labelCheck(
213219
|| (existing.getMetadata() != null
214220
&& dedupePredicate.test(existing.getMetadata().getLabels()));
215221
}
222+
223+
private static Optional<Event> createOrReplaceEvent(KubernetesClient client, Event event) {
224+
try {
225+
Event createdEvent = client.resource(event).createOrReplace();
226+
return Optional.of(createdEvent);
227+
} catch (KubernetesClientException e) {
228+
if (e.getCode() == HttpURLConnection.HTTP_FORBIDDEN) {
229+
// fail the reconcile when events cannot be delivered, unless FORBIDDEN
230+
// which can be a result of recoverable rbac issue, or namespace is terminating
231+
LOG.warn("Cannot create or update events, proceeding.", e);
232+
} else {
233+
throw e;
234+
}
235+
}
236+
return Optional.empty();
237+
}
216238
}

Diff for: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java

+47
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.kubernetes.operator.TestUtils;
2121

2222
import io.fabric8.kubernetes.api.model.Event;
23+
import io.fabric8.kubernetes.api.model.EventBuilder;
2324
import io.fabric8.kubernetes.client.KubernetesClient;
2425
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
2526
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -30,6 +31,7 @@
3031

3132
import javax.annotation.Nullable;
3233

34+
import java.net.HttpURLConnection;
3335
import java.time.Duration;
3436
import java.util.Map;
3537
import java.util.function.Consumer;
@@ -525,4 +527,49 @@ public void accept(Event event) {
525527
Assertions.assertEquals(1, event.getCount());
526528
Assertions.assertNull(eventConsumed);
527529
}
530+
531+
@Test
532+
public void testCreateOrReplaceEventOnDeletedNamespace() {
533+
var consumer =
534+
new Consumer<Event>() {
535+
@Override
536+
public void accept(Event event) {
537+
eventConsumed = event;
538+
}
539+
};
540+
var flinkApp = TestUtils.buildApplicationCluster();
541+
var reason = "Cleanup";
542+
var message = "message";
543+
var eventName =
544+
EventUtils.generateEventName(
545+
flinkApp,
546+
EventRecorder.Type.Warning,
547+
reason,
548+
message,
549+
EventRecorder.Component.Operator);
550+
551+
var namespaceName = flinkApp.getMetadata().getNamespace();
552+
553+
String eventCreatePath = String.format("/api/v1/namespaces/%s/events", namespaceName);
554+
555+
mockServer
556+
.expect()
557+
.post()
558+
.withPath(eventCreatePath)
559+
.andReturn(HttpURLConnection.HTTP_FORBIDDEN, new EventBuilder().build())
560+
.once();
561+
562+
Assertions.assertTrue(
563+
EventUtils.createOrUpdateEventWithInterval(
564+
kubernetesClient,
565+
flinkApp,
566+
EventRecorder.Type.Warning,
567+
reason,
568+
message,
569+
EventRecorder.Component.Operator,
570+
consumer,
571+
null,
572+
null));
573+
Assertions.assertNull(eventConsumed);
574+
}
528575
}

0 commit comments

Comments
 (0)