Skip to content

Commit 1e1ab10

Browse files
author
luops
committed
Merge branch 'refs/heads/dev' into connector-activemq-source
2 parents d94be4f + bde2350 commit 1e1ab10

File tree

15 files changed

+350
-154
lines changed

15 files changed

+350
-154
lines changed

.dlc.json

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
},
2424
{
2525
"pattern": "^https://twitter.com/ASFSeaTunnel"
26+
},
27+
{
28+
"pattern": "^https://github.com/apache/seatunnel/commit/"
2629
}
2730
],
2831
"timeout": "10s",

config/log4j2.properties

+2-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ appender.routing.name = routingAppender
6565
appender.routing.type = Routing
6666
appender.routing.purge.type = IdlePurgePolicy
6767
appender.routing.purge.timeToLive = 60
68+
appender.routing.purge.checkInterval = 1
6869
appender.routing.route.type = Routes
6970
appender.routing.route.pattern = $${ctx:ST-JID}
7071
appender.routing.route.system.type = Route
@@ -100,4 +101,4 @@ appender.file.strategy.action.condition.nested_condition.type = IfAny
100101
appender.file.strategy.action.condition.nested_condition.lastModify.type = IfLastModified
101102
appender.file.strategy.action.condition.nested_condition.lastModify.age = ${file_ttl}
102103
appender.file.strategy.action.condition.nested_condition.fileCount.type = IfAccumulatedFileCount
103-
appender.file.strategy.action.condition.nested_condition.fileCount.exceeds = ${file_count}
104+
appender.file.strategy.action.condition.nested_condition.fileCount.exceeds = ${file_count}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java

+55-46
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2121
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
22-
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2322
import org.apache.seatunnel.api.source.Collector;
2423
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2524
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
@@ -39,6 +38,10 @@
3938
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
4039
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
4140

41+
import org.apache.commons.csv.CSVFormat;
42+
import org.apache.commons.csv.CSVParser;
43+
import org.apache.commons.csv.CSVRecord;
44+
4245
import io.airlift.compress.lzo.LzopCodec;
4346
import lombok.extern.slf4j.Slf4j;
4447

@@ -47,12 +50,13 @@
4750
import java.io.InputStream;
4851
import java.io.InputStreamReader;
4952
import java.nio.charset.StandardCharsets;
53+
import java.util.HashMap;
5054
import java.util.Map;
5155
import java.util.Optional;
5256

5357
@Slf4j
5458
public class CsvReadStrategy extends AbstractReadStrategy {
55-
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
59+
private CsvDeserializationSchema deserializationSchema;
5660
private String fieldDelimiter = BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();
5761
private DateUtils.Formatter dateFormat = BaseSourceConfigOptions.DATE_FORMAT.defaultValue();
5862
private DateTimeUtils.Formatter datetimeFormat =
@@ -62,6 +66,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
6266
private CsvLineProcessor processor;
6367
private int[] indexes;
6468
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
69+
private CatalogTable inputCatalogTable;
6570

6671
@Override
6772
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
@@ -96,51 +101,54 @@ public void readProcess(
96101
break;
97102
}
98103

104+
CSVFormat csvFormat = CSVFormat.DEFAULT;
99105
try (BufferedReader reader =
100-
new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {
101-
reader.lines()
102-
.skip(skipHeaderNumber)
103-
.forEach(
104-
line -> {
105-
try {
106-
SeaTunnelRow seaTunnelRow =
107-
deserializationSchema.deserialize(
108-
line.getBytes(StandardCharsets.UTF_8));
109-
if (!readColumns.isEmpty()) {
110-
// need column projection
111-
Object[] fields;
112-
if (isMergePartition) {
113-
fields =
114-
new Object
115-
[readColumns.size()
116-
+ partitionsMap.size()];
117-
} else {
118-
fields = new Object[readColumns.size()];
119-
}
120-
for (int i = 0; i < indexes.length; i++) {
121-
fields[i] = seaTunnelRow.getField(indexes[i]);
122-
}
123-
seaTunnelRow = new SeaTunnelRow(fields);
124-
}
125-
if (isMergePartition) {
126-
int index = seaTunnelRowType.getTotalFields();
127-
for (String value : partitionsMap.values()) {
128-
seaTunnelRow.setField(index++, value);
129-
}
130-
}
131-
seaTunnelRow.setTableId(tableId);
132-
output.collect(seaTunnelRow);
133-
} catch (IOException e) {
134-
String errorMsg =
135-
String.format(
136-
"Deserialize this data [%s] failed, please check the origin data",
137-
line);
138-
throw new FileConnectorException(
139-
FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
140-
errorMsg,
141-
e);
142-
}
143-
});
106+
new BufferedReader(new InputStreamReader(actualInputStream, encoding));
107+
CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
108+
for (int i = 0; i < skipHeaderNumber; i++) {
109+
if (reader.readLine() == null) {
110+
throw new IOException(
111+
String.format(
112+
"File [%s] has fewer lines than expected to skip.",
113+
currentFileName));
114+
}
115+
}
116+
// read lines
117+
for (CSVRecord csvRecord : csvParser) {
118+
HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
119+
for (int i = 0; i < inputCatalogTable.getTableSchema().getColumns().size(); i++) {
120+
fieldIdValueMap.put(i, csvRecord.get(i));
121+
}
122+
SeaTunnelRow seaTunnelRow = deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
123+
if (!readColumns.isEmpty()) {
124+
// need column projection
125+
Object[] fields;
126+
if (isMergePartition) {
127+
fields = new Object[readColumns.size() + partitionsMap.size()];
128+
} else {
129+
fields = new Object[readColumns.size()];
130+
}
131+
for (int i = 0; i < indexes.length; i++) {
132+
fields[i] = seaTunnelRow.getField(indexes[i]);
133+
}
134+
seaTunnelRow = new SeaTunnelRow(fields);
135+
}
136+
if (isMergePartition) {
137+
int index = seaTunnelRowType.getTotalFields();
138+
for (String value : partitionsMap.values()) {
139+
seaTunnelRow.setField(index++, value);
140+
}
141+
}
142+
seaTunnelRow.setTableId(tableId);
143+
output.collect(seaTunnelRow);
144+
}
145+
} catch (IOException e) {
146+
String errorMsg =
147+
String.format(
148+
"Deserialize this file [%s] failed, please check the origin data",
149+
currentFileName);
150+
throw new FileConnectorException(
151+
FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, e);
144152
}
145153
}
146154

@@ -177,6 +185,7 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
177185
@Override
178186
public void setCatalogTable(CatalogTable catalogTable) {
179187
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
188+
this.inputCatalogTable = catalogTable;
180189
SeaTunnelRowType userDefinedRowTypeWithPartition =
181190
mergePartitionTypes(fileNames.get(0), rowType);
182191
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public class MilvusSource
4242
private final ReadonlyConfig config;
4343
private final Map<TablePath, CatalogTable> sourceTables;
4444

45-
public MilvusSource(ReadonlyConfig sourceConfing) {
46-
this.config = sourceConfing;
47-
MilvusConvertUtils milvusConvertUtils = new MilvusConvertUtils(sourceConfing);
45+
public MilvusSource(ReadonlyConfig sourceConfig) {
46+
this.config = sourceConfig;
47+
MilvusConvertUtils milvusConvertUtils = new MilvusConvertUtils(sourceConfig);
4848
this.sourceTables = milvusConvertUtils.getSourceTables();
4949
}
5050

@@ -66,15 +66,15 @@ public SourceReader<SeaTunnelRow, MilvusSourceSplit> createReader(
6666
@Override
6767
public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> createEnumerator(
6868
SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws Exception {
69-
return new MilvusSourceSplitEnumertor(context, config, sourceTables, null);
69+
return new MilvusSourceSplitEnumerator(context, config, sourceTables, null);
7070
}
7171

7272
@Override
7373
public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> restoreEnumerator(
7474
SourceSplitEnumerator.Context<MilvusSourceSplit> context,
7575
MilvusSourceState checkpointState)
7676
throws Exception {
77-
return new MilvusSourceSplitEnumertor(context, config, sourceTables, checkpointState);
77+
return new MilvusSourceSplitEnumerator(context, config, sourceTables, checkpointState);
7878
}
7979

8080
@Override

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,16 @@ private void pollNextData(MilvusSourceSplit split, Collector<SeaTunnelRow> outpu
162162
MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
163163
}
164164

165-
R<GetLoadStateResponse> loadStateResponse =
166-
client.getLoadState(
167-
GetLoadStateParam.newBuilder()
168-
.withDatabaseName(tablePath.getDatabaseName())
169-
.withCollectionName(tablePath.getTableName())
170-
.build());
165+
GetLoadStateParam.Builder loadStateParam =
166+
GetLoadStateParam.newBuilder()
167+
.withDatabaseName(tablePath.getDatabaseName())
168+
.withCollectionName(tablePath.getTableName());
169+
170+
if (StringUtils.isNotEmpty(partitionName)) {
171+
loadStateParam.withPartitionNames(Collections.singletonList(partitionName));
172+
}
173+
174+
R<GetLoadStateResponse> loadStateResponse = client.getLoadState(loadStateParam.build());
171175
if (loadStateResponse.getStatus() != R.Status.Success.getCode()) {
172176
throw new MilvusConnectorException(
173177
MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import java.util.concurrent.ConcurrentLinkedQueue;
4949

5050
@Slf4j
51-
public class MilvusSourceSplitEnumertor
51+
public class MilvusSourceSplitEnumerator
5252
implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> {
5353

5454
private final Map<TablePath, CatalogTable> tables;
@@ -60,7 +60,7 @@ public class MilvusSourceSplitEnumertor
6060

6161
private final ReadonlyConfig config;
6262

63-
public MilvusSourceSplitEnumertor(
63+
public MilvusSourceSplitEnumerator(
6464
Context<MilvusSourceSplit> context,
6565
ReadonlyConfig config,
6666
Map<TablePath, CatalogTable> sourceTables,

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public class StarRocksStreamLoadVisitor {
5252
private static final String RESULT_FAILED = "Fail";
5353
private static final String RESULT_SUCCESS = "Success";
5454
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
55-
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
56-
private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
55+
private static final String LABEL_STATE_VISIBLE = "VISIBLE";
56+
private static final String LABEL_STATE_COMMITTED = "COMMITTED";
5757
private static final String RESULT_LABEL_PREPARE = "PREPARE";
5858
private static final String RESULT_LABEL_ABORTED = "ABORTED";
5959
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
@@ -225,8 +225,8 @@ private void checkLabelState(String host, String label) throws IOException {
225225
}
226226
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
227227
switch (labelState) {
228-
case LAEBL_STATE_VISIBLE:
229-
case LAEBL_STATE_COMMITTED:
228+
case LABEL_STATE_VISIBLE:
229+
case LABEL_STATE_COMMITTED:
230230
return;
231231
case RESULT_LABEL_PREPARE:
232232
continue;

seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogTest.java

-83
This file was deleted.

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,11 @@ public class LocalFileIT extends TestSuiteBase {
286286
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
287287
container);
288288

289+
ContainerUtil.copyFileIntoContainers(
290+
"/csv/break_line.csv",
291+
"/seatunnel/read/csv/break_line/break_line.csv",
292+
container);
293+
289294
ContainerUtil.copyFileIntoContainers(
290295
"/text/e2e_null_format.txt",
291296
"/seatunnel/read/e2e_null_format/e2e_null_format.txt",
@@ -300,6 +305,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
300305
TestHelper helper = new TestHelper(container);
301306
helper.execute("/csv/fake_to_local_csv.conf");
302307
helper.execute("/csv/local_csv_to_assert.conf");
308+
helper.execute("/csv/breakline_csv_to_assert.conf");
303309
helper.execute("/excel/fake_to_local_excel.conf");
304310
helper.execute("/excel/local_excel_to_assert.conf");
305311
helper.execute("/excel/local_excel_projection_to_assert.conf");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
20,"harry
2+
potter"
3+
21,"tom"

0 commit comments

Comments
 (0)