Skip to content

Commit c284d30

Browse files
hawk9821Hisoka-X
andcommitted
[Feature][Connectors-v2] Paimon version upgrade to 1.0.1
Co-authored-by: Jia Fan <[email protected]>
1 parent 87e9355 commit c284d30

File tree

27 files changed

+485
-47
lines changed

27 files changed

+485
-47
lines changed

docs/en/connector-v2/sink/Paimon.md

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@
66

77
Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
88

9+
### Comparison between Seatunnel and Paimon vsrsion
10+
11+
| Seatunnel Version | Paimon Version |
12+
|-------------------|------------------|
13+
| 2.3.2 - 2.3.3 | 0.4-SNAPSHOT |
14+
| 2.3.4 | 0.6-SNAPSHOT |
15+
| 2.3.5 - 2.3.9 | 0.7.0-incubating |
16+
| 2.3.10 | 1.0.1 |
17+
918
## Supported DataSource Info
1019

1120
| Datasource | Dependent | Maven |

docs/en/connector-v2/source/Paimon.md

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@
66

77
Read data from Apache Paimon.
88

9+
### Comparison between Seatunnel and Paimon vsrsion
10+
11+
| Seatunnel Version | Paimon Version |
12+
|-------------------|------------------|
13+
| 2.3.2 - 2.3.3 | 0.4-SNAPSHOT |
14+
| 2.3.4 | 0.6-SNAPSHOT |
15+
| 2.3.5 - 2.3.9 | 0.7.0-incubating |
16+
| 2.3.10 | 1.0.1 |
17+
918
## Key features
1019

1120
- [x] [batch](../../concept/connector-v2-features.md)

docs/zh/connector-v2/sink/Paimon.md

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@
66

77
Apache Paimon数据连接器。支持cdc写以及自动建表。
88

9+
### Seatunnmel与Paimon版本对照
10+
11+
| Seatunnel Version | Paimon Version |
12+
|-------------------|------------------|
13+
| 2.3.2 - 2.3.3 | 0.4-SNAPSHOT |
14+
| 2.3.4 | 0.6-SNAPSHOT |
15+
| 2.3.5 - 2.3.9 | 0.7.0-incubating |
16+
| 2.3.10 | 1.0.1 |
17+
918
## 支持的数据源信息
1019

1120
| 数据源 | 依赖 | Maven |

seatunnel-connectors-v2/connector-paimon/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<name>SeaTunnel : Connectors V2 : Paimon</name>
3131

3232
<properties>
33-
<paimon.version>0.7.0-incubating</paimon.version>
33+
<paimon.version>1.0.1</paimon.version>
3434
<hive.version>2.3.9</hive.version>
3535
<connector.name>connector.paimon</connector.name>
3636
</properties>

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;
3939
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
4040

41+
import org.apache.paimon.CoreOptions;
4142
import org.apache.paimon.catalog.Identifier;
4243
import org.apache.paimon.schema.Schema;
4344
import org.apache.paimon.schema.SchemaChange;
@@ -51,6 +52,8 @@
5152

5253
import java.io.Closeable;
5354
import java.io.IOException;
55+
import java.util.ArrayList;
56+
import java.util.HashMap;
5457
import java.util.List;
5558
import java.util.Map;
5659
import java.util.Objects;
@@ -103,7 +106,8 @@ public String getDefaultDatabase() throws CatalogException {
103106

104107
@Override
105108
public boolean databaseExists(String databaseName) throws CatalogException {
106-
return catalog.databaseExists(databaseName);
109+
List<String> listDatabases = catalog.listDatabases();
110+
return listDatabases.contains(databaseName);
107111
}
108112

109113
@Override
@@ -123,7 +127,16 @@ public List<String> listTables(String databaseName)
123127

124128
@Override
125129
public boolean tableExists(TablePath tablePath) throws CatalogException {
126-
return catalog.tableExists(toIdentifier(tablePath));
130+
Identifier identifier = toIdentifier(tablePath);
131+
List<String> tables = new ArrayList<>();
132+
try {
133+
if (databaseExists(identifier.getDatabaseName())) {
134+
tables = catalog.listTables(identifier.getDatabaseName());
135+
}
136+
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) {
137+
return false;
138+
}
139+
return tables.contains(identifier.getTableName());
127140
}
128141

129142
@Override
@@ -216,7 +229,9 @@ private Schema buildPaimonSchema(@NonNull org.apache.paimon.schema.TableSchema s
216229
Schema.Builder builder = Schema.newBuilder();
217230
schema.fields()
218231
.forEach(field -> builder.column(field.name(), field.type(), field.description()));
219-
builder.options(schema.options());
232+
Map<String, String> options = new HashMap<>(schema.options());
233+
options.remove(CoreOptions.PATH.key());
234+
builder.options(options);
220235
builder.primaryKey(schema.primaryKeys());
221236
builder.partitionKeys(schema.partitionKeys());
222237
builder.comment(schema.comment());
@@ -296,7 +311,9 @@ private void resolveException(Exception e) {
296311
}
297312
} else if (cause instanceof RuntimeException) {
298313
String message = cause.getMessage();
299-
if (message.contains("Cannot define 'bucket-key' in unaware or dynamic bucket mode.")) {
314+
// https://github.com/apache/paimon/pull/3320/files#diff-d3e068ea8caf83d2371f0eaa1cbf3d02ff06e1c1cdceec5fab2e065cecd96230
315+
if (message.contains(
316+
"Cannot define 'bucket-key' with bucket -1, please specify a bucket number.")) {
300317
throw new PaimonConnectorException(
301318
PaimonConnectorErrorCode.WRITE_PROPS_BUCKET_KEY_ERROR, message);
302319
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,9 @@ public PaimonSinkWriter(
128128
this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
129129
this.newTableWrite();
130130
BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
131-
this.dynamicBucket =
132-
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode;
131+
this.dynamicBucket = BucketMode.HASH_DYNAMIC == bucketMode;
133132
int bucket = ((FileStoreTable) paimonFileStoretable).coreOptions().bucket();
134-
if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
133+
if (bucket == -1 && BucketMode.BUCKET_UNAWARE == bucketMode) {
135134
log.warn("Append only table currently do not support dynamic bucket");
136135
}
137136
if (dynamicBucket) {

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java

+19-6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
3030
import org.apache.paimon.types.DataField;
3131
import org.apache.paimon.types.DataType;
32+
import org.apache.paimon.types.RowType;
33+
import org.apache.paimon.utils.Int2ShortHashMap;
3234

3335
import java.io.IOException;
3436
import java.util.List;
@@ -37,6 +39,8 @@
3739
import java.util.stream.Collectors;
3840
import java.util.stream.IntStream;
3941

42+
import static org.apache.paimon.crosspartition.IndexBootstrap.BUCKET_FIELD;
43+
4044
public class PaimonBucketAssigner {
4145

4246
private final RowPartitionKeyExtractor extractor;
@@ -45,31 +49,37 @@ public class PaimonBucketAssigner {
4549

4650
private final TableSchema schema;
4751

52+
private final Int2ShortHashMap hash2Bucket;
53+
4854
public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {
4955
FileStoreTable fileStoreTable = (FileStoreTable) table;
5056
this.schema = fileStoreTable.schema();
5157
this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
52-
long dynamicBucketTargetRowNum =
53-
((FileStoreTable) table).coreOptions().dynamicBucketTargetRowNum();
58+
long dynamicBucketTargetRowNum = fileStoreTable.coreOptions().dynamicBucketTargetRowNum();
5459
this.simpleHashBucketAssigner =
5560
new SimpleHashBucketAssigner(numAssigners, assignId, dynamicBucketTargetRowNum);
61+
this.hash2Bucket = new Int2ShortHashMap();
5662
loadBucketIndex(fileStoreTable, numAssigners, assignId);
5763
}
5864

5965
private void loadBucketIndex(FileStoreTable fileStoreTable, int numAssigners, int assignId) {
6066
IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
67+
RowType indexRowType = IndexBootstrap.bootstrapType(schema);
68+
int bucketFieldIndex = indexRowType.getFieldIndex(BUCKET_FIELD);
6169
List<String> fieldNames = schema.fieldNames();
6270
Map<String, Integer> fieldIndexMap =
6371
IntStream.range(0, fieldNames.size())
6472
.boxed()
6573
.collect(Collectors.toMap(fieldNames::get, Function.identity()));
6674
List<DataField> primaryKeys = schema.primaryKeysFields();
75+
6776
try (RecordReader<InternalRow> recordReader =
6877
indexBootstrap.bootstrap(numAssigners, assignId)) {
6978
RecordReaderIterator<InternalRow> readerIterator =
7079
new RecordReaderIterator<>(recordReader);
7180
while (readerIterator.hasNext()) {
7281
InternalRow row = readerIterator.next();
82+
int rowBucket = row.getInt(bucketFieldIndex);
7383
GenericRow binaryRow = new GenericRow(fieldNames.size());
7484
for (int i = 0; i < primaryKeys.size(); i++) {
7585
String name = primaryKeys.get(i).name();
@@ -78,16 +88,19 @@ private void loadBucketIndex(FileStoreTable fileStoreTable, int numAssigners, in
7888
fieldIndexMap.get(name),
7989
InternalRow.createFieldGetter(type, i).getFieldOrNull(row));
8090
}
81-
assign(binaryRow);
91+
hash2Bucket.put(
92+
extractor.trimmedPrimaryKey(binaryRow).hashCode(), (short) rowBucket);
8293
}
8394
} catch (IOException e) {
8495
throw new RuntimeException(e);
8596
}
8697
}
8798

8899
public int assign(InternalRow rowData) {
89-
int hash = extractor.trimmedPrimaryKey(rowData).hashCode();
90-
return Math.abs(
91-
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
100+
int rowHashCode = extractor.trimmedPrimaryKey(rowData).hashCode();
101+
if (hash2Bucket.containsKey(rowHashCode)) {
102+
return hash2Bucket.get(rowHashCode);
103+
}
104+
return this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), rowHashCode);
92105
}
93106
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
7878
if (Objects.nonNull(split)) {
7979
// read logic
8080
try (final RecordReader<InternalRow> reader =
81-
tableRead.executeFilter().createReader(split.getSplit())) {
82-
final RecordReaderIterator<InternalRow> rowIterator =
83-
new RecordReaderIterator<>(reader);
81+
tableRead.executeFilter().createReader(split.getSplit());
82+
final RecordReaderIterator<InternalRow> rowIterator =
83+
new RecordReaderIterator<>(reader)) {
8484
while (rowIterator.hasNext()) {
8585
final InternalRow row = rowIterator.next();
8686
final SeaTunnelRow seaTunnelRow =

seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public void before() throws Exception {
5858
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
5959
catalog.createDatabase(DATABASE_NAME, true);
6060
Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
61-
if (!catalog.tableExists(identifier)) {
61+
List<String> tables = catalog.listTables(DATABASE_NAME);
62+
if (!tables.contains(identifier.getTableName())) {
6263
Schema.Builder schemaBuilder = Schema.newBuilder();
6364
schemaBuilder.column("id", DataTypes.INT(), "primary Key");
6465
schemaBuilder.column("name", DataTypes.STRING(), "name");

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.e2e.common.TestSuiteBase;
2121
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
22+
import org.apache.seatunnel.e2e.common.container.EngineType;
2223
import org.apache.seatunnel.e2e.common.container.TestContainer;
2324
import org.apache.seatunnel.e2e.common.container.TestContainerId;
2425
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -34,8 +35,8 @@
3435
import java.nio.file.Path;
3536

3637
@DisabledOnContainer(
37-
value = TestContainerId.FLINK_1_13,
38-
disabledReason = "Paimon does not support flink 1.13")
38+
value = {TestContainerId.FLINK_1_13, TestContainerId.SPARK_2_4},
39+
disabledReason = "Paimon does not support flink 1.13 and spark 2.x")
3940
public class PaimonIT extends TestSuiteBase {
4041

4142
@TestContainerExtension
@@ -49,6 +50,11 @@ public class PaimonIT extends TestSuiteBase {
4950
};
5051

5152
@TestTemplate
53+
@DisabledOnContainer(
54+
type = EngineType.FLINK,
55+
value = {},
56+
disabledReason =
57+
"this case use Local file system, e2e's flink is a distributed environment.")
5258
public void testWriteAndReadPaimon(TestContainer container)
5359
throws IOException, InterruptedException {
5460
Container.ExecResult textWriteResult = container.executeJob("/fake_to_paimon.conf");

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class PaimonRecordWithFullType {
3636
public int[] c_array;
3737
public BinaryString c_string;
3838
public boolean c_boolean;
39-
public short c_tinyint;
39+
public byte c_tinyint;
4040
public short c_smallint;
4141
public int c_int;
4242
public long c_bigint;

0 commit comments

Comments
 (0)