Skip to content

Commit af38fac

Browse files
authored
[Improve][Zeta] Remove misleading exception log when job be canceled (#8988)
1 parent fc942a5 commit af38fac

File tree

7 files changed

+113
-3
lines changed

7 files changed

+113
-3
lines changed

Diff for: seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ protected GenericContainer<?> createSeaTunnelContainerWithFakeSourceAndInMemoryS
159159
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
160160
.waitingFor(Wait.forLogMessage(".*received new worker register:.*", 1));
161161
copySeaTunnelStarterToContainer(server);
162-
server.setPortBindings(Collections.singletonList("5801:5801"));
163-
server.setExposedPorts(Collections.singletonList(5801));
162+
server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
163+
server.setExposedPorts(Arrays.asList(5801, 8080));
164164

165165
server.withCopyFileToContainer(
166166
MountableFile.forHostPath(

Diff for: seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public class InMemorySinkFactory
4040
public static final Option<Boolean> THROW_EXCEPTION =
4141
Options.key("throw_exception").booleanType().defaultValue(false);
4242

43+
public static final Option<Boolean> WRITER_SLEEP =
44+
Options.key("writer_sleep").booleanType().defaultValue(false);
45+
4346
public static final Option<Boolean> THROW_OUT_OF_MEMORY =
4447
Options.key("throw_out_of_memory").booleanType().defaultValue(false);
4548
public static final Option<Boolean> CHECKPOINT_SLEEP =
@@ -66,6 +69,7 @@ public OptionRule optionRule() {
6669
.optional(
6770
THROW_EXCEPTION,
6871
THROW_OUT_OF_MEMORY,
72+
WRITER_SLEEP,
6973
CHECKPOINT_SLEEP,
7074
THROW_EXCEPTION_OF_COMMITTER,
7175
ASSERT_OPTIONS_KEY,

Diff for: seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java

+8
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ public InMemorySinkWriter(ReadonlyConfig config) {
7979

8080
@Override
8181
public void write(SeaTunnelRow element) throws IOException {
82+
if (config.get(InMemorySinkFactory.WRITER_SLEEP)) {
83+
try {
84+
Thread.sleep(999999999L);
85+
} catch (InterruptedException e) {
86+
throw new RuntimeException(e);
87+
}
88+
}
89+
8290
if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) {
8391
throw new OutOfMemoryError();
8492
}

Diff for: seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java

+44
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,16 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.testcontainers.containers.Container;
2626

27+
import lombok.extern.slf4j.Slf4j;
28+
2729
import java.io.IOException;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.TimeUnit;
2832

2933
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
34+
import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
3035

36+
@Slf4j
3137
public class JobClientJobProxyIT extends SeaTunnelEngineContainer {
3238

3339
@Override
@@ -73,6 +79,44 @@ public void testNoDuplicatedReleaseSlot() throws IOException, InterruptedExcepti
7379
server.getLogs().contains("wrong target release operation with job"));
7480
}
7581

82+
@Test
83+
public void testNoExceptionLogWhenCancelJob() throws IOException, InterruptedException {
84+
String jobId = String.valueOf(System.currentTimeMillis());
85+
CompletableFuture.runAsync(
86+
() -> {
87+
try {
88+
executeJob(
89+
"/stream_fakesource_to_inmemory_pending_row_in_queue.conf", jobId);
90+
} catch (Exception e) {
91+
log.error("Commit task exception :" + e.getMessage());
92+
throw new RuntimeException();
93+
}
94+
});
95+
96+
given().pollDelay(10, TimeUnit.SECONDS)
97+
.await()
98+
.pollDelay(5000L, TimeUnit.MILLISECONDS)
99+
.untilAsserted(
100+
() -> {
101+
Assertions.assertEquals("RUNNING", this.getJobStatus(jobId));
102+
});
103+
104+
String logBeforeCancel = this.getServerLogs();
105+
cancelJob(jobId);
106+
given().pollDelay(10, TimeUnit.SECONDS)
107+
.await()
108+
.pollDelay(5000L, TimeUnit.MILLISECONDS)
109+
.untilAsserted(
110+
() -> {
111+
Assertions.assertEquals("CANCELED", this.getJobStatus(jobId));
112+
});
113+
String logAfterCancel = this.getServerLogs().substring(logBeforeCancel.length());
114+
// in TaskExecutionService.BlockingWorker::run catch Throwable
115+
Assertions.assertFalse(logAfterCancel.contains("Exception in"), logAfterCancel);
116+
Assertions.assertEquals(
117+
4, StringUtils.countMatches(logAfterCancel, "Interrupted task"), logAfterCancel);
118+
}
119+
76120
@Test
77121
public void testMultiTableSinkFailedWithThrowable() throws IOException, InterruptedException {
78122
Container.ExecResult execResult =

Diff for: seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ seatunnel:
3333
max-retained: 3
3434
plugin-config:
3535
namespace: /tmp/seatunnel/checkpoint_snapshot/
36+
http:
37+
enable-http: true
38+
port: 8080
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
job.mode = "STREAMING"
23+
checkpoint.interval = 5000
24+
}
25+
26+
source {
27+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
28+
FakeSource {
29+
# More than TaskGroupWithIntermediateBlockingQueue::QUEUE_SIZE
30+
row.num = 9999
31+
parallelism = 1
32+
schema = {
33+
fields {
34+
c_int = int
35+
}
36+
}
37+
}
38+
}
39+
40+
transform {
41+
}
42+
43+
sink {
44+
InMemory {
45+
writer_sleep = true
46+
}
47+
}

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,11 @@ public void run() {
702702
taskGroupExecutionTracker.exception(e);
703703
}
704704
} catch (Throwable e) {
705-
logger.warning("Exception in " + t, e);
705+
if (taskGroupExecutionTracker.isCancel.get()) {
706+
logger.warning(String.format("Interrupted task %d - %s", t.getTaskID(), t));
707+
} else {
708+
logger.warning("Exception in " + t, e);
709+
}
706710
taskGroupExecutionTracker.exception(e);
707711
} finally {
708712
taskGroupExecutionTracker.taskDone(t);

0 commit comments

Comments
 (0)