Skip to content

Commit 790150a

Browse files
authored
[Feature][Zeta] Support config job retry times in job config (#6690)
1 parent 37578e1 commit 790150a

File tree

9 files changed

+222
-139
lines changed

9 files changed

+222
-139
lines changed

Diff for: docs/en/concept/JobEnvConfig.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# JobEnvConfig
1+
# Job Env Config
22

33
This document describes env configuration information, the common parameters can be used in all engines. In order to better distinguish between engine parameters, the additional parameters of other engine need to carry a prefix.
44
In flink engine, we use `flink.` as the prefix. In the spark engine, we do not use any prefixes to modify parameters, because the official spark parameters themselves start with `spark.`
@@ -29,6 +29,10 @@ In `STREAMING` mode, checkpoints is required, if you do not set it, it will be o
2929

3030
This parameter configures the parallelism of source and sink.
3131

32+
### job.retry.times
33+
34+
Used to control the default retry times when a job fails. The default value is 3, and it only works in the Zeta engine.
35+
3236
### shade.identifier
3337

3438
Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored.

Diff for: seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ public interface EnvCommonOptions {
4444
.defaultValue(JobMode.BATCH)
4545
.withDescription("The job mode of this job, support Batch and Stream");
4646

47+
Option<Integer> JOB_RETRY_TIMES =
48+
Options.key("job.retry.times")
49+
.intType()
50+
.defaultValue(3)
51+
.withDescription("The retry times of this job");
52+
4753
Option<Long> CHECKPOINT_INTERVAL =
4854
Options.key("checkpoint.interval")
4955
.longType()

Diff for: seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java

+77-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.e2e.common.container.seatunnel;
1919

20+
import org.apache.seatunnel.common.utils.FileUtils;
2021
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
2122
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
2223
import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -45,7 +46,9 @@
4546
import lombok.NoArgsConstructor;
4647
import lombok.extern.slf4j.Slf4j;
4748

49+
import java.io.File;
4850
import java.io.IOException;
51+
import java.net.URL;
4952
import java.nio.file.Paths;
5053
import java.util.ArrayList;
5154
import java.util.Arrays;
@@ -75,7 +78,11 @@ public class SeaTunnelContainer extends AbstractTestContainer {
7578

7679
@Override
7780
public void startUp() throws Exception {
78-
server =
81+
server = createSeaTunnelServer();
82+
}
83+
84+
private GenericContainer<?> createSeaTunnelServer() throws IOException, InterruptedException {
85+
GenericContainer<?> server =
7986
new GenericContainer<>(getDockerImage())
8087
.withNetwork(NETWORK)
8188
.withEnv("TZ", "UTC")
@@ -106,6 +113,75 @@ public void startUp() throws Exception {
106113
executeExtraCommands(server);
107114

108115
server.start();
116+
return server;
117+
}
118+
119+
protected GenericContainer<?> createSeaTunnelContainerWithFakeSourceAndInMemorySink(
120+
String configFilePath) throws IOException, InterruptedException {
121+
GenericContainer<?> server =
122+
new GenericContainer<>(getDockerImage())
123+
.withNetwork(NETWORK)
124+
.withEnv("TZ", "UTC")
125+
.withCommand(
126+
ContainerUtil.adaptPathForWin(
127+
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
128+
.withNetworkAliases("server")
129+
.withExposedPorts()
130+
.withLogConsumer(
131+
new Slf4jLogConsumer(
132+
DockerLoggerFactory.getLogger(
133+
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
134+
.waitingFor(Wait.forListeningPort());
135+
copySeaTunnelStarterToContainer(server);
136+
server.setPortBindings(Collections.singletonList("5801:5801"));
137+
server.setExposedPorts(Collections.singletonList(5801));
138+
139+
server.withCopyFileToContainer(
140+
MountableFile.forHostPath(
141+
PROJECT_ROOT_PATH
142+
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
143+
Paths.get(SEATUNNEL_HOME, "config").toString());
144+
145+
server.withCopyFileToContainer(
146+
MountableFile.forHostPath(configFilePath),
147+
Paths.get(SEATUNNEL_HOME, "config", "seatunnel.yaml").toString());
148+
149+
server.withCopyFileToContainer(
150+
MountableFile.forHostPath(
151+
PROJECT_ROOT_PATH
152+
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
153+
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
154+
155+
server.start();
156+
// execute extra commands
157+
executeExtraCommands(server);
158+
159+
File module = new File(PROJECT_ROOT_PATH + File.separator + getConnectorModulePath());
160+
List<File> connectorFiles =
161+
ContainerUtil.getConnectorFiles(
162+
module, Collections.singleton("connector-fake"), getConnectorNamePrefix());
163+
URL url =
164+
FileUtils.searchJarFiles(
165+
Paths.get(
166+
PROJECT_ROOT_PATH
167+
+ File.separator
168+
+ "seatunnel-e2e/seatunnel-e2e-common/target"))
169+
.stream()
170+
.filter(jar -> jar.toString().endsWith("-tests.jar"))
171+
.findFirst()
172+
.get();
173+
connectorFiles.add(new File(url.getFile()));
174+
connectorFiles.forEach(
175+
jar ->
176+
server.copyFileToContainer(
177+
MountableFile.forHostPath(jar.getAbsolutePath()),
178+
Paths.get(SEATUNNEL_HOME, "connectors", jar.getName()).toString()));
179+
server.copyFileToContainer(
180+
MountableFile.forHostPath(
181+
PROJECT_ROOT_PATH
182+
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties"),
183+
Paths.get(SEATUNNEL_HOME, "connectors", "plugin-mapping.properties").toString());
184+
return server;
109185
}
110186

111187
@Override

Diff for: seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java

+15-61
Original file line numberDiff line numberDiff line change
@@ -17,87 +17,41 @@
1717

1818
package org.apache.seatunnel.engine.e2e;
1919

20-
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
21-
2220
import org.apache.commons.lang3.StringUtils;
2321

2422
import org.junit.jupiter.api.Assertions;
2523
import org.junit.jupiter.api.BeforeAll;
2624
import org.junit.jupiter.api.Test;
27-
import org.testcontainers.DockerClientFactory;
2825
import org.testcontainers.containers.Container;
29-
import org.testcontainers.containers.GenericContainer;
30-
import org.testcontainers.containers.output.Slf4jLogConsumer;
31-
import org.testcontainers.containers.wait.strategy.Wait;
32-
import org.testcontainers.images.PullPolicy;
33-
import org.testcontainers.utility.DockerLoggerFactory;
34-
import org.testcontainers.utility.MountableFile;
3526

3627
import java.io.IOException;
37-
import java.nio.file.Paths;
38-
import java.util.Arrays;
39-
import java.util.Collections;
40-
import java.util.stream.Collectors;
4128

4229
import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
4330

4431
public class JobClientJobProxyIT extends SeaTunnelContainer {
45-
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
46-
private static final String SERVER_SHELL = "seatunnel-cluster.sh";
4732

4833
@Override
4934
@BeforeAll
5035
public void startUp() throws Exception {
36+
// use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in container
5137
this.server =
52-
new GenericContainer<>(getDockerImage())
53-
.withNetwork(NETWORK)
54-
.withCommand(
55-
ContainerUtil.adaptPathForWin(
56-
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
57-
.withNetworkAliases("server")
58-
.withImagePullPolicy(PullPolicy.alwaysPull())
59-
.withExposedPorts()
60-
.withLogConsumer(
61-
new Slf4jLogConsumer(
62-
DockerLoggerFactory.getLogger(
63-
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
64-
.waitingFor(Wait.forListeningPort());
65-
copySeaTunnelStarterToContainer(server);
66-
server.setExposedPorts(Arrays.asList(5801));
67-
server.setPortBindings(Collections.singletonList("5801:5801"));
68-
server.withCopyFileToContainer(
69-
MountableFile.forHostPath(
38+
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
7039
PROJECT_ROOT_PATH
71-
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
72-
Paths.get(SEATUNNEL_HOME, "config").toString());
40+
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml");
41+
}
7342

74-
// use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in container
75-
server.withCopyFileToContainer(
76-
MountableFile.forHostPath(
77-
PROJECT_ROOT_PATH
78-
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
79-
Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());
43+
@Test
44+
public void testJobRetryTimes() throws IOException, InterruptedException {
45+
Container.ExecResult execResult =
46+
executeJob(server, "/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf");
47+
Assertions.assertNotEquals(0, execResult.getExitCode());
48+
Assertions.assertTrue(server.getLogs().contains("Restore time 1, pipeline"));
49+
Assertions.assertFalse(server.getLogs().contains("Restore time 3, pipeline"));
8050

81-
server.withCopyFileToContainer(
82-
MountableFile.forHostPath(
83-
PROJECT_ROOT_PATH
84-
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
85-
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
86-
LOG.info(
87-
"find images: "
88-
+ DockerClientFactory.lazyClient().listImagesCmd().exec().stream()
89-
.map(
90-
image -> {
91-
if (image.getRepoTags() != null) {
92-
return image.getRepoTags()[0];
93-
} else {
94-
return image.getRepoDigests()[0];
95-
}
96-
})
97-
.collect(Collectors.joining(",")));
98-
server.start();
99-
// execute extra commands
100-
executeExtraCommands(server);
51+
Container.ExecResult execResult2 =
52+
executeJob(server, "/retry-times/stream_fake_to_inmemory_with_error.conf");
53+
Assertions.assertNotEquals(0, execResult2.getExitCode());
54+
Assertions.assertTrue(server.getLogs().contains("Restore time 3, pipeline"));
10155
}
10256

10357
@Test

Diff for: seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java

+2-72
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.seatunnel.engine.e2e.classloader;
1919

20-
import org.apache.seatunnel.common.utils.FileUtils;
2120
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
2221
import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
2322
import org.apache.seatunnel.engine.server.rest.RestConstant;
@@ -28,20 +27,12 @@
2827
import org.junit.jupiter.api.BeforeEach;
2928
import org.junit.jupiter.api.Test;
3029
import org.testcontainers.containers.Container;
31-
import org.testcontainers.containers.GenericContainer;
32-
import org.testcontainers.containers.output.Slf4jLogConsumer;
33-
import org.testcontainers.containers.wait.strategy.Wait;
34-
import org.testcontainers.utility.DockerLoggerFactory;
35-
import org.testcontainers.utility.MountableFile;
3630

3731
import io.restassured.response.Response;
3832

39-
import java.io.File;
4033
import java.io.IOException;
41-
import java.net.URL;
4234
import java.nio.file.Path;
4335
import java.nio.file.Paths;
44-
import java.util.Collections;
4536
import java.util.List;
4637
import java.util.Map;
4738
import java.util.concurrent.TimeUnit;
@@ -189,71 +180,10 @@ private boolean containsDaemonThread() throws IOException, InterruptedException
189180
@BeforeEach
190181
public void startUp() throws Exception {
191182
server =
192-
new GenericContainer<>(getDockerImage())
193-
.withNetwork(NETWORK)
194-
.withEnv("TZ", "UTC")
195-
.withCommand(
196-
ContainerUtil.adaptPathForWin(
197-
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
198-
.withNetworkAliases("server")
199-
.withExposedPorts()
200-
.withLogConsumer(
201-
new Slf4jLogConsumer(
202-
DockerLoggerFactory.getLogger(
203-
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
204-
.waitingFor(Wait.forListeningPort());
205-
copySeaTunnelStarterToContainer(server);
206-
server.setExposedPorts(Collections.singletonList(5801));
207-
208-
server.withCopyFileToContainer(
209-
MountableFile.forHostPath(
210-
PROJECT_ROOT_PATH
211-
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
212-
Paths.get(SEATUNNEL_HOME, "config").toString());
213-
214-
server.withCopyFileToContainer(
215-
MountableFile.forHostPath(
183+
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
216184
PROJECT_ROOT_PATH
217185
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/"
218-
+ seatunnelConfigFileName()),
219-
Paths.get(SEATUNNEL_HOME, "config", "seatunnel.yaml").toString());
220-
221-
server.withCopyFileToContainer(
222-
MountableFile.forHostPath(
223-
PROJECT_ROOT_PATH
224-
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
225-
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
226-
227-
server.start();
228-
// execute extra commands
229-
executeExtraCommands(server);
230-
231-
File module = new File(PROJECT_ROOT_PATH + File.separator + getConnectorModulePath());
232-
List<File> connectorFiles =
233-
ContainerUtil.getConnectorFiles(
234-
module, Collections.singleton("connector-fake"), getConnectorNamePrefix());
235-
URL url =
236-
FileUtils.searchJarFiles(
237-
Paths.get(
238-
PROJECT_ROOT_PATH
239-
+ File.separator
240-
+ "seatunnel-e2e/seatunnel-e2e-common/target"))
241-
.stream()
242-
.filter(jar -> jar.toString().endsWith("-tests.jar"))
243-
.findFirst()
244-
.get();
245-
connectorFiles.add(new File(url.getFile()));
246-
connectorFiles.forEach(
247-
jar ->
248-
server.copyFileToContainer(
249-
MountableFile.forHostPath(jar.getAbsolutePath()),
250-
Paths.get(SEATUNNEL_HOME, "connectors", jar.getName()).toString()));
251-
252-
server.copyFileToContainer(
253-
MountableFile.forHostPath(
254-
PROJECT_ROOT_PATH
255-
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties"),
256-
Paths.get(SEATUNNEL_HOME, "connectors", "plugin-mapping.properties").toString());
186+
+ seatunnelConfigFileName());
257187
}
258188

259189
@AfterEach

Diff for: seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties renamed to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties

+1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@
2323
# SeaTunnel Connector-V2
2424

2525
seatunnel.source.FakeSource = connector-fake
26+
seatunnel.sink.Console = connector-console
2627
seatunnel.sink.InMemory = seatunnel-e2e-common

0 commit comments

Comments
 (0)