Skip to content

Commit df0d11d

Browse files
authored
[Fix][API] Fixed not invoke the SinkAggregatedCommitter's init method (#9070)
1 parent a047cab commit df0d11d

File tree

8 files changed

+135
-20
lines changed

8 files changed

+135
-20
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232
*/
3333
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> extends Serializable {
3434

35-
/** init sink aggregated committer */
35+
/**
36+
* init sink aggregated committer, this method will be called not once. Each retry will call
37+
* this.
38+
*/
3639
default void init() {};
3740

3841
/** Re-commit message to third party data receiver, The method need to achieve idempotency. */

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,16 @@ private void initResourceManager() {
5353
for (String tableIdentifier : aggCommitters.keySet()) {
5454
SinkAggregatedCommitter<?, ?> aggCommitter = aggCommitters.get(tableIdentifier);
5555
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) {
56-
return;
56+
break;
5757
}
5858
resourceManager =
5959
((SupportMultiTableSinkAggregatedCommitter<?>) aggCommitter)
6060
.initMultiTableResourceManager(aggCommitters.size(), 1);
6161
break;
6262
}
63-
if (resourceManager != null) {
64-
for (String tableIdentifier : aggCommitters.keySet()) {
65-
SinkAggregatedCommitter<?, ?> aggCommitter = aggCommitters.get(tableIdentifier);
66-
aggCommitter.init();
63+
for (SinkAggregatedCommitter<?, ?> aggCommitter : aggCommitters.values()) {
64+
aggCommitter.init();
65+
if (resourceManager != null) {
6766
((SupportMultiTableSinkAggregatedCommitter<?>) aggCommitter)
6867
.setMultiTableResourceManager(resourceManager, 0);
6968
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.sink.multitablesink;
19+
20+
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
public class MultiTableSinkAggregatedCommitterTest {
34+
35+
@Test
36+
void testInitBeInvoked() throws IOException {
37+
Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new HashMap<>();
38+
List<String> methodInvoked = new ArrayList<>();
39+
aggCommitters.put(
40+
"table1",
41+
new SinkAggregatedCommitter<Object, Object>() {
42+
43+
@Override
44+
public void init() {
45+
methodInvoked.add("init");
46+
}
47+
48+
@Override
49+
public List<Object> commit(List<Object> aggregatedCommitInfo)
50+
throws IOException {
51+
return Collections.emptyList();
52+
}
53+
54+
@Override
55+
public Object combine(List<Object> commitInfos) {
56+
return null;
57+
}
58+
59+
@Override
60+
public void abort(List<Object> aggregatedCommitInfo) throws Exception {}
61+
62+
@Override
63+
public void close() throws IOException {
64+
methodInvoked.add("close");
65+
}
66+
});
67+
MultiTableSinkAggregatedCommitter committer =
68+
new MultiTableSinkAggregatedCommitter(aggCommitters);
69+
committer.init();
70+
committer.close();
71+
Assertions.assertIterableEquals(Arrays.asList("init", "close"), methodInvoked);
72+
}
73+
}

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,24 @@ public class ClickhouseFileSinkAggCommitter
4141
implements SinkAggregatedCommitter<CKFileCommitInfo, CKFileAggCommitInfo> {
4242

4343
private transient ClickhouseProxy proxy;
44-
private final ClickhouseTable clickhouseTable;
44+
private ClickhouseTable clickhouseTable;
4545

4646
private final FileReaderOption fileReaderOption;
4747

4848
public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
4949
fileReaderOption = readerOption;
50-
proxy = new ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
50+
}
51+
52+
@Override
53+
public void init() {
54+
proxy =
55+
new ClickhouseProxy(
56+
fileReaderOption.getShardMetadata().getDefaultShard().getNode());
5157
clickhouseTable =
5258
proxy.getClickhouseTable(
5359
proxy.getClickhouseConnection(),
54-
readerOption.getShardMetadata().getDatabase(),
55-
readerOption.getShardMetadata().getTable());
60+
fileReaderOption.getShardMetadata().getDatabase(),
61+
fileReaderOption.getShardMetadata().getTable());
5662
}
5763

5864
@Override

seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import org.apache.seatunnel.api.common.JobContext;
2121
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2222
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
23+
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
2324
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2425
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
2526
import org.apache.seatunnel.api.sink.SinkCommitter;
2627
import org.apache.seatunnel.api.sink.SinkWriter;
28+
import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
2729
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2830
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2931
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
@@ -108,16 +110,30 @@ private static void runWithContext(
108110
}
109111

110112
Optional<? extends SinkCommitter<?>> sinkCommitter = sink.createCommitter();
111-
Optional<? extends SinkAggregatedCommitter<?, ?>> aggregatedCommitter =
113+
Optional<? extends SinkAggregatedCommitter<?, ?>> aggregatedCommitterOptional =
112114
sink.createAggregatedCommitter();
113115

114116
if (!commitInfos.isEmpty()) {
115-
if (aggregatedCommitter.isPresent()) {
117+
if (aggregatedCommitterOptional.isPresent()) {
118+
SinkAggregatedCommitter<?, ?> aggregatedCommitter =
119+
aggregatedCommitterOptional.get();
120+
MultiTableResourceManager resourceManager = null;
121+
if (aggregatedCommitter instanceof SupportMultiTableSinkAggregatedCommitter) {
122+
resourceManager =
123+
((SupportMultiTableSinkAggregatedCommitter<?>) aggregatedCommitter)
124+
.initMultiTableResourceManager(1, 1);
125+
}
126+
aggregatedCommitter.init();
127+
if (resourceManager != null) {
128+
((SupportMultiTableSinkAggregatedCommitter<?>) aggregatedCommitter)
129+
.setMultiTableResourceManager(resourceManager, 0);
130+
}
131+
116132
Object aggregatedCommitInfoT =
117-
((SinkAggregatedCommitter) aggregatedCommitter.get()).combine(commitInfos);
118-
((SinkAggregatedCommitter) aggregatedCommitter.get())
133+
((SinkAggregatedCommitter) aggregatedCommitter).combine(commitInfos);
134+
((SinkAggregatedCommitter) aggregatedCommitter)
119135
.commit(Collections.singletonList(aggregatedCommitInfoT));
120-
aggregatedCommitter.get().close();
136+
aggregatedCommitter.close();
121137
} else if (sinkCommitter.isPresent()) {
122138
((SinkCommitter) sinkCommitter.get()).commit(commitInfos);
123139
} else {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java

+6
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,14 @@
3333
public class FileSinkAggregatedCommitter
3434
implements SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
3535
protected HadoopFileSystemProxy hadoopFileSystemProxy;
36+
private final HadoopConf hadoopConf;
3637

3738
public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
39+
this.hadoopConf = hadoopConf;
40+
}
41+
42+
@Override
43+
public void init() {
3844
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
3945
}
4046

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,18 @@
3333
public class IcebergAggregatedCommitter
3434
implements SinkAggregatedCommitter<IcebergCommitInfo, IcebergAggregatedCommitInfo> {
3535

36-
private final IcebergTableLoader tableLoader;
37-
private final IcebergFilesCommitter filesCommitter;
36+
private IcebergTableLoader tableLoader;
37+
private IcebergFilesCommitter filesCommitter;
38+
private final IcebergSinkConfig config;
39+
private final CatalogTable catalogTable;
3840

3941
public IcebergAggregatedCommitter(IcebergSinkConfig config, CatalogTable catalogTable) {
42+
this.config = config;
43+
this.catalogTable = catalogTable;
44+
}
45+
46+
@Override
47+
public void init() {
4048
this.tableLoader = IcebergTableLoader.create(config, catalogTable);
4149
this.filesCommitter = IcebergFilesCommitter.of(config, tableLoader);
4250
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,19 @@
3939
public class JdbcSinkAggregatedCommitter
4040
implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
4141

42-
private final XaFacade xaFacade;
43-
private final XaGroupOps xaGroupOps;
42+
private XaFacade xaFacade;
43+
private XaGroupOps xaGroupOps;
4444
private final JdbcSinkConfig jdbcSinkConfig;
4545

4646
public JdbcSinkAggregatedCommitter(JdbcSinkConfig jdbcSinkConfig) {
47+
this.jdbcSinkConfig = jdbcSinkConfig;
48+
}
49+
50+
@Override
51+
public void init() {
4752
this.xaFacade =
4853
XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
4954
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
50-
this.jdbcSinkConfig = jdbcSinkConfig;
5155
}
5256

5357
private void tryOpen() throws IOException {

0 commit comments

Comments
 (0)