Skip to content

Commit 26c528a

Browse files
[Fix][Connector-V2][FTP] Fix FTP connector connection_mode is not effective (#7865)
1 parent 4406fbc commit 26c528a

File tree

6 files changed

+312
-15
lines changed

6 files changed

+312
-15
lines changed

seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2323
import org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
2424

25-
import static org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE;
25+
import static org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL;
2626

2727
public class FtpConfigOptions extends BaseSourceConfigOptions {
2828
public static final Option<String> FTP_PASSWORD =
@@ -42,6 +42,6 @@ public class FtpConfigOptions extends BaseSourceConfigOptions {
4242
public static final Option<FtpConnectionMode> FTP_CONNECTION_MODE =
4343
Options.key("connection_mode")
4444
.enumType(FtpConnectionMode.class)
45-
.defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE)
45+
.defaultValue(ACTIVE_LOCAL)
4646
.withDescription("FTP server connection mode ");
4747
}

seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
public enum FtpConnectionMode {
2222

2323
/** ACTIVE_LOCAL_DATA_CONNECTION_MODE */
24-
ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"),
24+
ACTIVE_LOCAL("active_local"),
2525

2626
/** PASSIVE_LOCAL_DATA_CONNECTION_MODE */
27-
PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local");
27+
PASSIVE_LOCAL("passive_local");
2828

2929
private final String mode;
3030

@@ -38,7 +38,7 @@ public String getMode() {
3838

3939
public static FtpConnectionMode fromMode(String mode) {
4040
for (FtpConnectionMode ftpConnectionModeEnum : FtpConnectionMode.values()) {
41-
if (ftpConnectionModeEnum.getMode().equals(mode)) {
41+
if (ftpConnectionModeEnum.getMode().equals(mode.toLowerCase())) {
4242
return ftpConnectionModeEnum;
4343
}
4444
}

seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.apache.hadoop.net.NetUtils;
4141
import org.apache.hadoop.util.Progressable;
4242

43+
import lombok.extern.slf4j.Slf4j;
44+
4345
import java.io.FileNotFoundException;
4446
import java.io.IOException;
4547
import java.io.InputStream;
@@ -52,6 +54,7 @@
5254
*/
5355
@InterfaceAudience.Public
5456
@InterfaceStability.Stable
57+
@Slf4j
5558
public class SeaTunnelFTPFileSystem extends FileSystem {
5659
public static final Log LOG = LogFactory.getLog(SeaTunnelFTPFileSystem.class);
5760

@@ -156,10 +159,7 @@ private FTPClient connect() throws IOException {
156159
}
157160

158161
setFsFtpConnectionMode(
159-
client,
160-
conf.get(
161-
FS_FTP_CONNECTION_MODE,
162-
FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode()));
162+
client, conf.get(FS_FTP_CONNECTION_MODE, FtpConnectionMode.ACTIVE_LOCAL.getMode()));
163163

164164
return client;
165165
}
@@ -172,13 +172,18 @@ private FTPClient connect() throws IOException {
172172
*/
173173
private void setFsFtpConnectionMode(FTPClient client, String mode) {
174174
switch (FtpConnectionMode.fromMode(mode)) {
175-
case ACTIVE_LOCAL_DATA_CONNECTION_MODE:
176-
client.enterLocalActiveMode();
177-
break;
178-
case PASSIVE_LOCAL_DATA_CONNECTION_MODE:
175+
case PASSIVE_LOCAL:
179176
client.enterLocalPassiveMode();
180177
break;
178+
case ACTIVE_LOCAL:
179+
client.enterLocalActiveMode();
180+
break;
181181
default:
182+
log.warn(
183+
"Unsupported FTP connection mode: " + mode,
184+
" Using default FTP connection mode: "
185+
+ FtpConnectionMode.ACTIVE_LOCAL.getMode());
186+
client.enterLocalActiveMode();
182187
break;
183188
}
184189
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java

+70-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.junit.jupiter.api.Assertions;
3232
import org.junit.jupiter.api.BeforeAll;
3333
import org.junit.jupiter.api.TestTemplate;
34+
import org.testcontainers.containers.Container;
3435
import org.testcontainers.containers.GenericContainer;
3536
import org.testcontainers.containers.output.Slf4jLogConsumer;
37+
import org.testcontainers.containers.wait.strategy.Wait;
3638
import org.testcontainers.lifecycle.Startables;
3739
import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
3840

@@ -42,10 +44,15 @@
4244

4345
import java.io.ByteArrayOutputStream;
4446
import java.io.IOException;
47+
import java.io.StringReader;
4548
import java.nio.charset.StandardCharsets;
4649
import java.util.ArrayList;
4750
import java.util.Collections;
4851
import java.util.List;
52+
import java.util.Properties;
53+
import java.util.function.BiFunction;
54+
import java.util.stream.Collectors;
55+
import java.util.stream.IntStream;
4956
import java.util.stream.Stream;
5057

5158
@DisabledOnContainer(
@@ -68,14 +75,30 @@ public class FtpFileIT extends TestSuiteBase implements TestResource {
6875

6976
private GenericContainer<?> ftpContainer;
7077

78+
private String ftpPassiveAddress;
79+
80+
private BiFunction<Integer, Integer, Integer[]> generateExposedPorts =
81+
(startPort, endPort) ->
82+
IntStream.rangeClosed(startPort, endPort).boxed().toArray(Integer[]::new);
83+
84+
private BiFunction<Integer, Integer, List<String>> generatePortBindings =
85+
(startPort, endPort) ->
86+
IntStream.rangeClosed(startPort, endPort)
87+
.mapToObj(i -> i + ":" + i)
88+
.collect(Collectors.toList());
89+
7190
@BeforeAll
7291
@Override
7392
public void startUp() throws Exception {
93+
int passiveStartPort = 30000;
94+
int passiveEndPort = 30004;
7495
ftpContainer =
7596
new GenericContainer<>(FTP_IMAGE)
7697
.withExposedPorts(FTP_PORT)
7798
.withNetwork(NETWORK)
7899
.withExposedPorts(FTP_PORT)
100+
.withExposedPorts(
101+
generateExposedPorts.apply(passiveStartPort, passiveEndPort))
79102
.withNetworkAliases(ftp_CONTAINER_HOST)
80103
.withEnv("FILE_OPEN_MODE", "0666")
81104
.withEnv("WRITE_ENABLE", "YES")
@@ -85,13 +108,31 @@ public void startUp() throws Exception {
85108
.withEnv("LOCAL_UMASK", "000")
86109
.withEnv("FTP_USER", USERNAME)
87110
.withEnv("FTP_PASS", PASSWORD)
88-
.withEnv("PASV_ADDRESS", "0.0.0.0")
111+
.withEnv("PASV_MIN_PORT", String.valueOf(passiveStartPort))
112+
.withEnv("PASV_MAX_PORT", String.valueOf(passiveEndPort))
89113
.withLogConsumer(new Slf4jLogConsumer(log))
114+
// Modify the strategy mode because the passive mode port does not need to
115+
// be checked here, it does not start with the FTP startup.
116+
.waitingFor(Wait.forLogMessage(".*", 1))
90117
.withPrivilegedMode(true);
91118

92-
ftpContainer.setPortBindings(Collections.singletonList("21:21"));
119+
List<String> portBind = new ArrayList<>();
120+
portBind.add("21:21");
121+
portBind.addAll(generatePortBindings.apply(passiveStartPort, passiveEndPort));
122+
123+
ftpContainer.setPortBindings(portBind);
93124
ftpContainer.start();
94125
Startables.deepStart(Stream.of(ftpContainer)).join();
126+
127+
// Get the passive mode address of the FTP container
128+
Properties properties = new Properties();
129+
properties.load(
130+
new StringReader(
131+
ftpContainer
132+
.execInContainer("sh", "-c", "cat /etc/vsftpd/vsftpd.conf")
133+
.getStdout()));
134+
ftpPassiveAddress = properties.getProperty("pasv_address");
135+
95136
log.info("ftp container started");
96137

97138
ContainerUtil.copyFileIntoContainers(
@@ -126,6 +167,33 @@ public void startUp() throws Exception {
126167
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/");
127168
}
128169

170+
@TestTemplate
171+
public void testFtpFileReadAndWriteForPassive(TestContainer container)
172+
throws IOException, InterruptedException {
173+
List<String> configParams = Collections.singletonList("ftpHost=" + ftpPassiveAddress);
174+
// Test passive mode
175+
assertJobExecution(
176+
container, "/text/ftp_file_text_to_assert_for_passive.conf", configParams);
177+
assertJobExecution(container, "/text/fake_to_ftp_file_text_for_passive.conf", configParams);
178+
179+
String homePath = "/home/vsftpd/seatunnel/tmp/seatunnel/passive_text";
180+
// test write ftp text file
181+
Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
182+
183+
// Confirm data is written correctly
184+
Container.ExecResult execResult =
185+
ftpContainer.execInContainer("sh", "-c", "awk 'END {print NR}' " + homePath + "/*");
186+
Assertions.assertEquals("15", execResult.getStdout().trim());
187+
188+
deleteFileFromContainer(homePath);
189+
}
190+
191+
private void assertJobExecution(TestContainer container, String configPath, List<String> params)
192+
throws IOException, InterruptedException {
193+
Container.ExecResult execResult = container.executeJob(configPath, params);
194+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
195+
}
196+
129197
@TestTemplate
130198
public void testFtpFileReadAndWrite(TestContainer container)
131199
throws IOException, InterruptedException {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
22+
# You can set spark configuration here
23+
spark.app.name = "SeaTunnel"
24+
spark.executor.instances = 1
25+
spark.executor.cores = 1
26+
spark.executor.memory = "1g"
27+
spark.master = local
28+
}
29+
30+
source {
31+
FakeSource {
32+
result_table_name = "ftp"
33+
row.num = 15
34+
schema = {
35+
fields {
36+
c_map = "map<string, string>"
37+
c_array = "array<int>"
38+
c_string = string
39+
c_boolean = boolean
40+
c_tinyint = tinyint
41+
c_smallint = smallint
42+
c_int = int
43+
c_bigint = bigint
44+
c_float = float
45+
c_double = double
46+
c_bytes = bytes
47+
c_date = date
48+
c_decimal = "decimal(38, 18)"
49+
c_timestamp = timestamp
50+
c_row = {
51+
c_map = "map<string, string>"
52+
c_array = "array<int>"
53+
c_string = string
54+
c_boolean = boolean
55+
c_tinyint = tinyint
56+
c_smallint = smallint
57+
c_int = int
58+
c_bigint = bigint
59+
c_float = float
60+
c_double = double
61+
c_bytes = bytes
62+
c_date = date
63+
c_decimal = "decimal(38, 18)"
64+
c_timestamp = timestamp
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
sink {
72+
FtpFile {
73+
host = ${ftpHost}
74+
port = 21
75+
user = seatunnel
76+
password = pass
77+
connection_mode = "passive_local"
78+
path = "/tmp/seatunnel/passive_text"
79+
source_table_name = "ftp"
80+
row_delimiter = "\n"
81+
partition_dir_expression = "${k0}=${v0}"
82+
is_partition_field_write_in_file = true
83+
file_name_expression = "${transactionId}_${now}"
84+
file_format_type = "text"
85+
filename_time_format = "yyyy.MM.dd"
86+
is_enable_transaction = true
87+
}
88+
}

0 commit comments

Comments
 (0)