Skip to content

Commit f22d4eb

Browse files
authored
[Improve][Connector-v2] Support checkpoint in batch mode for paimon sink (#8333)
1 parent 00c5aed commit f22d4eb

File tree

9 files changed

+203
-108
lines changed

9 files changed

+203
-108
lines changed

Diff for: docs/en/connector-v2/sink/Paimon.md

+5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ libfb303-xxx.jar
4747
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
4848
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
4949

50+
## Checkpoint in batch mode
51+
52+
When you set `checkpoint.interval` to a value greater than 0 in batch mode, the paimon connector will commit the data to the paimon table when the checkpoint triggers after a certain number of records have been written. At this moment, the written data in paimon that is visible.
53+
However, if you do not set `checkpoint.interval` in batch mode, the paimon sink connector will commit the data after all records are written. The written data in paimon that is not visible until the batch task completes.
54+
5055
## Changelog
5156
You must configure the `changelog-producer=input` option to enable the changelog producer mode of the paimon table. If you use the auto-create table function of paimon sink, you can configure this property in `paimon.table.write-props`.
5257

Diff for: docs/zh/connector-v2/sink/Paimon.md

+5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ libfb303-xxx.jar
4646
| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
4747
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
4848

49+
## 批模式下的checkpoint
50+
51+
当您在批处理模式下将`checkpoint.interval`设置为大于0的值时,在写入一定数量的记录后checkpoint触发时,paimon连接器将把数据提交到paimon表。此时,写入的数据是可见的。
52+
但是,如果您没有在批处理模式下设置`checkpoint.interval`,则在写入所有记录之后,paimon sink连接器将提交数据。到批任务完成之前,写入的数据都是不可见的。
53+
4954
## 更新日志
5055
你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
5156

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ public class PaimonSink
6767

6868
private JobContext jobContext;
6969

70-
private ReadonlyConfig readonlyConfig;
70+
private final ReadonlyConfig readonlyConfig;
7171

72-
private PaimonSinkConfig paimonSinkConfig;
72+
private final PaimonSinkConfig paimonSinkConfig;
7373

74-
private CatalogTable catalogTable;
74+
private final CatalogTable catalogTable;
7575

76-
private PaimonHadoopConfiguration paimonHadoopConfiguration;
76+
private final PaimonHadoopConfiguration paimonHadoopConfiguration;
7777

7878
public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
7979
this.readonlyConfig = readonlyConfig;
@@ -102,8 +102,7 @@ public PaimonSinkWriter createWriter(SinkWriter.Context context) throws IOExcept
102102
@Override
103103
public Optional<SinkAggregatedCommitter<PaimonCommitInfo, PaimonAggregatedCommitInfo>>
104104
createAggregatedCommitter() throws IOException {
105-
return Optional.of(
106-
new PaimonAggregatedCommitter(paimonTable, jobContext, paimonHadoopConfiguration));
105+
return Optional.of(new PaimonAggregatedCommitter(paimonTable, paimonHadoopConfiguration));
107106
}
108107

109108
@Override

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

+11-30
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
4040
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.handler.AlterPaimonTableSchemaEventHandler;
4141
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
42-
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
4342
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
4443

4544
import org.apache.paimon.CoreOptions;
@@ -49,8 +48,6 @@
4948
import org.apache.paimon.table.BucketMode;
5049
import org.apache.paimon.table.FileStoreTable;
5150
import org.apache.paimon.table.Table;
52-
import org.apache.paimon.table.sink.BatchTableCommit;
53-
import org.apache.paimon.table.sink.BatchTableWrite;
5451
import org.apache.paimon.table.sink.CommitMessage;
5552
import org.apache.paimon.table.sink.StreamTableCommit;
5653
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -89,10 +86,6 @@ public class PaimonSinkWriter
8986

9087
private SeaTunnelRowType seaTunnelRowType;
9188

92-
private final SinkWriter.Context context;
93-
94-
private final JobContext jobContext;
95-
9689
private org.apache.seatunnel.api.table.catalog.TableSchema sourceTableSchema;
9790

9891
private TableSchema sinkPaimonTableSchema;
@@ -133,8 +126,6 @@ public PaimonSinkWriter(
133126
}
134127
this.paimonSinkConfig = paimonSinkConfig;
135128
this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
136-
this.context = context;
137-
this.jobContext = jobContext;
138129
this.newTableWrite();
139130
BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
140131
this.dynamicBucket =
@@ -147,8 +138,8 @@ public PaimonSinkWriter(
147138
this.bucketAssigner =
148139
new PaimonBucketAssigner(
149140
paimonFileStoretable,
150-
this.context.getNumberOfParallelSubtasks(),
151-
this.context.getIndexOfSubtask());
141+
context.getNumberOfParallelSubtasks(),
142+
context.getIndexOfSubtask());
152143
}
153144
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
154145
}
@@ -181,14 +172,13 @@ public PaimonSinkWriter(
181172
.map(PaimonSinkState::getCommittables)
182173
.flatMap(List::stream)
183174
.collect(Collectors.toList());
184-
log.info("Trying to recommit states {}", commitables);
185-
if (JobContextUtil.isBatchJob(jobContext)) {
186-
log.debug("Trying to recommit states batch mode");
187-
((BatchTableCommit) tableCommit).commit(commitables);
188-
} else {
189-
log.debug("Trying to recommit states streaming mode");
190-
((StreamTableCommit) tableCommit).commit(checkpointId, commitables);
175+
// batch mode without checkpoint has no state to commit
176+
if (commitables.isEmpty()) {
177+
return;
191178
}
179+
// streaming mode or batch mode with checkpoint need to recommit by stream api
180+
log.info("Trying to recommit states {}", commitables);
181+
((StreamTableCommit) tableCommit).commit(checkpointId, commitables);
192182
} catch (Exception e) {
193183
throw new PaimonConnectorException(
194184
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -238,10 +228,7 @@ private void reOpenTableWrite() {
238228
}
239229

240230
private void newTableWrite() {
241-
this.tableWriteBuilder =
242-
JobContextUtil.isBatchJob(jobContext)
243-
? this.paimonFileStoretable.newBatchWriteBuilder()
244-
: this.paimonFileStoretable.newStreamWriteBuilder();
231+
this.tableWriteBuilder = this.paimonFileStoretable.newStreamWriteBuilder();
245232
TableWrite oldTableWrite = this.tableWrite;
246233
this.tableWrite =
247234
tableWriteBuilder
@@ -260,14 +247,8 @@ public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
260247
@Override
261248
public Optional<PaimonCommitInfo> prepareCommit(long checkpointId) throws IOException {
262249
try {
263-
List<CommitMessage> fileCommittables;
264-
if (JobContextUtil.isBatchJob(jobContext)) {
265-
fileCommittables = ((BatchTableWrite) tableWrite).prepareCommit();
266-
} else {
267-
fileCommittables =
268-
((StreamTableWrite) tableWrite)
269-
.prepareCommit(waitCompaction(), checkpointId);
270-
}
250+
List<CommitMessage> fileCommittables =
251+
((StreamTableWrite) tableWrite).prepareCommit(waitCompaction(), checkpointId);
271252
committables.addAll(fileCommittables);
272253
return Optional.of(new PaimonCommitInfo(fileCommittables, checkpointId));
273254
} catch (Exception e) {

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java

+12-39
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit;
1919

20-
import org.apache.seatunnel.api.common.JobContext;
2120
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
2221
import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
2322
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
2423
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
2524
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
2625
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
27-
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
2826

2927
import org.apache.paimon.table.Table;
30-
import org.apache.paimon.table.sink.BatchTableCommit;
3128
import org.apache.paimon.table.sink.CommitMessage;
3229
import org.apache.paimon.table.sink.StreamTableCommit;
3330
import org.apache.paimon.table.sink.TableCommit;
@@ -41,7 +38,6 @@
4138
import java.util.List;
4239
import java.util.Map;
4340
import java.util.concurrent.CopyOnWriteArrayList;
44-
import java.util.stream.Collectors;
4541

4642
/** Paimon connector aggregated committer class */
4743
@Slf4j
@@ -53,17 +49,9 @@ public class PaimonAggregatedCommitter
5349

5450
private final WriteBuilder tableWriteBuilder;
5551

56-
private final JobContext jobContext;
57-
5852
public PaimonAggregatedCommitter(
59-
Table table,
60-
JobContext jobContext,
61-
PaimonHadoopConfiguration paimonHadoopConfiguration) {
62-
this.jobContext = jobContext;
63-
this.tableWriteBuilder =
64-
JobContextUtil.isBatchJob(jobContext)
65-
? table.newBatchWriteBuilder()
66-
: table.newStreamWriteBuilder();
53+
Table table, PaimonHadoopConfiguration paimonHadoopConfiguration) {
54+
this.tableWriteBuilder = table.newStreamWriteBuilder();
6755
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
6856
}
6957

@@ -73,31 +61,16 @@ public List<PaimonAggregatedCommitInfo> commit(
7361
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
7462
PaimonSecurityContext.runSecured(
7563
() -> {
76-
if (JobContextUtil.isBatchJob(jobContext)) {
77-
log.debug("Trying to commit states batch mode");
78-
List<CommitMessage> fileCommittables =
79-
aggregatedCommitInfo.stream()
80-
.flatMap(
81-
info ->
82-
info.getCommittablesMap().values()
83-
.stream())
84-
.flatMap(List::stream)
85-
.collect(Collectors.toList());
86-
((BatchTableCommit) tableCommit).commit(fileCommittables);
87-
} else {
88-
log.debug("Trying to commit states streaming mode");
89-
aggregatedCommitInfo.stream()
90-
.flatMap(
91-
paimonAggregatedCommitInfo ->
92-
paimonAggregatedCommitInfo.getCommittablesMap()
93-
.entrySet().stream())
94-
.forEach(
95-
entry ->
96-
((StreamTableCommit) tableCommit)
97-
.commit(
98-
entry.getKey(),
99-
entry.getValue()));
100-
}
64+
log.debug("Trying to commit states streaming mode");
65+
aggregatedCommitInfo.stream()
66+
.flatMap(
67+
paimonAggregatedCommitInfo ->
68+
paimonAggregatedCommitInfo.getCommittablesMap()
69+
.entrySet().stream())
70+
.forEach(
71+
entry ->
72+
((StreamTableCommit) tableCommit)
73+
.commit(entry.getKey(), entry.getValue()));
10174
return null;
10275
});
10376
} catch (Exception e) {

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java

-32
This file was deleted.

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,21 @@ protected boolean isIssueWeAlreadyKnow(String threadName) {
122122
}
123123

124124
@Test
125-
public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception {
125+
public void testFakeCDCSinkPaimonWithS3Filesystem() throws Exception {
126126
Container.ExecResult execResult = executeJob("/fake_to_paimon_with_s3.conf");
127127
Assertions.assertEquals(0, execResult.getExitCode());
128128

129129
Container.ExecResult readResult = executeJob("/paimon_with_s3_to_assert.conf");
130130
Assertions.assertEquals(0, readResult.getExitCode());
131131
}
132+
133+
@Test
134+
public void testFakeCDCSinkPaimonWithCheckpointInBatchModeWithS3Filesystem() throws Exception {
135+
Container.ExecResult execResult =
136+
executeJob("/fake_to_paimon_with_s3_with_checkpoint.conf");
137+
Assertions.assertEquals(0, execResult.getExitCode());
138+
139+
Container.ExecResult readResult = executeJob("/fake_2_paimon_with_s3_to_assert.conf");
140+
Assertions.assertEquals(0, readResult.getExitCode());
141+
}
132142
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
execution.parallelism = 1
20+
spark.app.name = "SeaTunnel"
21+
spark.executor.instances = 2
22+
spark.executor.cores = 1
23+
spark.executor.memory = "1g"
24+
spark.master = local
25+
job.mode = "BATCH"
26+
}
27+
28+
source {
29+
Paimon {
30+
warehouse = "s3a://test/"
31+
database = "seatunnel_namespace12"
32+
table = "st_test"
33+
paimon.hadoop.conf = {
34+
fs.s3a.access-key=minio
35+
fs.s3a.secret-key=miniominio
36+
fs.s3a.endpoint="http://minio:9000"
37+
fs.s3a.path.style.access=true
38+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
39+
}
40+
}
41+
}
42+
43+
sink {
44+
Assert {
45+
rules {
46+
row_rules = [
47+
{
48+
rule_type = MAX_ROW
49+
rule_value = 5000
50+
}
51+
],
52+
field_rules = [
53+
{
54+
field_name = pk_id
55+
field_type = bigint
56+
field_value = [
57+
{
58+
rule_type = NOT_NULL
59+
},
60+
{
61+
rule_type = MIN
62+
rule_value = 1
63+
},
64+
{
65+
rule_type = MAX
66+
rule_value = 100000
67+
}
68+
]
69+
},
70+
{
71+
field_name = name
72+
field_type = string
73+
field_value = [
74+
{
75+
rule_type = NOT_NULL
76+
}
77+
]
78+
},
79+
{
80+
field_name = score
81+
field_type = int
82+
field_value = [
83+
{
84+
rule_type = NOT_NULL
85+
}
86+
]
87+
}
88+
]
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)