Skip to content

Commit 0b623e4

Browse files
authored
[flink] Upgrade sink connector to new SinkV2 API (#205)
1 parent c98971d commit 0b623e4

File tree

9 files changed

+388
-166
lines changed

9 files changed

+388
-166
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.flink.sink;
18+
19+
import com.alibaba.fluss.annotation.Internal;
20+
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.connector.flink.sink.writer.AppendSinkWriter;
22+
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
23+
import com.alibaba.fluss.connector.flink.sink.writer.UpsertSinkWriter;
24+
import com.alibaba.fluss.metadata.TablePath;
25+
26+
import org.apache.flink.api.connector.sink2.Sink;
27+
import org.apache.flink.api.connector.sink2.SinkWriter;
28+
import org.apache.flink.api.connector.sink2.WriterInitContext;
29+
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
30+
import org.apache.flink.table.data.RowData;
31+
import org.apache.flink.table.types.logical.RowType;
32+
33+
import javax.annotation.Nullable;
34+
35+
import java.io.IOException;
36+
import java.io.Serializable;
37+
38+
/** Flink sink for Fluss. */
39+
class FlinkSink implements Sink<RowData> {
40+
41+
private static final long serialVersionUID = 1L;
42+
43+
private final SinkWriterBuilder<? extends FlinkSinkWriter> builder;
44+
45+
FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter> builder) {
46+
this.builder = builder;
47+
}
48+
49+
@Deprecated
50+
@Override
51+
public SinkWriter<RowData> createWriter(InitContext context) throws IOException {
52+
FlinkSinkWriter flinkSinkWriter = builder.createWriter();
53+
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
54+
return flinkSinkWriter;
55+
}
56+
57+
@Override
58+
public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOException {
59+
FlinkSinkWriter flinkSinkWriter = builder.createWriter();
60+
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
61+
return flinkSinkWriter;
62+
}
63+
64+
@Internal
65+
interface SinkWriterBuilder<W extends FlinkSinkWriter> extends Serializable {
66+
W createWriter();
67+
}
68+
69+
@Internal
70+
static class AppendSinkWriterBuilder implements SinkWriterBuilder<AppendSinkWriter> {
71+
72+
private static final long serialVersionUID = 1L;
73+
74+
private final TablePath tablePath;
75+
private final Configuration flussConfig;
76+
private final RowType tableRowType;
77+
private final boolean ignoreDelete;
78+
79+
public AppendSinkWriterBuilder(
80+
TablePath tablePath,
81+
Configuration flussConfig,
82+
RowType tableRowType,
83+
boolean ignoreDelete) {
84+
this.tablePath = tablePath;
85+
this.flussConfig = flussConfig;
86+
this.tableRowType = tableRowType;
87+
this.ignoreDelete = ignoreDelete;
88+
}
89+
90+
@Override
91+
public AppendSinkWriter createWriter() {
92+
return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete);
93+
}
94+
}
95+
96+
@Internal
97+
static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWriter> {
98+
99+
private static final long serialVersionUID = 1L;
100+
101+
private final TablePath tablePath;
102+
private final Configuration flussConfig;
103+
private final RowType tableRowType;
104+
private final @Nullable int[] targetColumnIndexes;
105+
private final boolean ignoreDelete;
106+
107+
UpsertSinkWriterBuilder(
108+
TablePath tablePath,
109+
Configuration flussConfig,
110+
RowType tableRowType,
111+
@Nullable int[] targetColumnIndexes,
112+
boolean ignoreDelete) {
113+
this.tablePath = tablePath;
114+
this.flussConfig = flussConfig;
115+
this.tableRowType = tableRowType;
116+
this.targetColumnIndexes = targetColumnIndexes;
117+
this.ignoreDelete = ignoreDelete;
118+
}
119+
120+
@Override
121+
public UpsertSinkWriter createWriter() {
122+
return new UpsertSinkWriter(
123+
tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
124+
}
125+
}
126+
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.connector.flink.sink;
1818

1919
import com.alibaba.fluss.config.Configuration;
20+
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
2021
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
2122
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
2223
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
@@ -29,7 +30,7 @@
2930
import org.apache.flink.table.connector.ChangelogMode;
3031
import org.apache.flink.table.connector.RowLevelModificationScanContext;
3132
import org.apache.flink.table.connector.sink.DynamicTableSink;
32-
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
33+
import org.apache.flink.table.connector.sink.SinkV2Provider;
3334
import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
3435
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
3536
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
@@ -160,18 +161,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
160161
// else, it's full update, ignore the given target columns as we don't care the order
161162
}
162163

163-
FlinkSinkFunction sinkFunction =
164-
primaryKeyIndexes.length > 0
165-
? new UpsertSinkFunction(
164+
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter> flinkSinkWriterBuilder =
165+
(primaryKeyIndexes.length > 0)
166+
? new FlinkSink.UpsertSinkWriterBuilder(
166167
tablePath,
167168
flussConfig,
168169
tableRowType,
169170
targetColumnIndexes,
170171
ignoreDelete)
171-
: new AppendSinkFunction(
172+
: new FlinkSink.AppendSinkWriterBuilder(
172173
tablePath, flussConfig, tableRowType, ignoreDelete);
173174

174-
return SinkFunctionProvider.of(sinkFunction);
175+
FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);
176+
177+
return SinkV2Provider.of(flinkSink);
175178
}
176179

177180
private List<String> columns(int[] columnIndexes) {

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java renamed to fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/AppendSinkWriter.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,26 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.alibaba.fluss.connector.flink.sink;
17+
package com.alibaba.fluss.connector.flink.sink.writer;
1818

1919
import com.alibaba.fluss.client.table.writer.AppendWriter;
2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.metadata.TablePath;
2222
import com.alibaba.fluss.row.InternalRow;
2323

24+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2425
import org.apache.flink.table.types.logical.RowType;
2526
import org.apache.flink.types.RowKind;
2627

2728
import java.io.IOException;
2829
import java.util.concurrent.CompletableFuture;
2930

30-
/** An append only sink for fluss log table. */
31-
class AppendSinkFunction extends FlinkSinkFunction {
32-
33-
private static final long serialVersionUID = 1L;
31+
/** An append only sink writer for fluss log table. */
32+
public class AppendSinkWriter extends FlinkSinkWriter {
3433

3534
private transient AppendWriter appendWriter;
3635

37-
AppendSinkFunction(
36+
public AppendSinkWriter(
3837
TablePath tablePath,
3938
Configuration flussConfig,
4039
RowType tableRowType,
@@ -43,8 +42,8 @@ class AppendSinkFunction extends FlinkSinkFunction {
4342
}
4443

4544
@Override
46-
public void open(org.apache.flink.configuration.Configuration config) {
47-
super.open(config);
45+
public void initialize(SinkWriterMetricGroup metricGroup) {
46+
super.initialize(metricGroup);
4847
appendWriter = table.newAppend().createWriter();
4948
LOG.info("Finished opening Fluss {}.", this.getClass().getSimpleName());
5049
}
@@ -55,13 +54,8 @@ CompletableFuture<?> writeRow(RowKind rowKind, InternalRow internalRow) {
5554
}
5655

5756
@Override
58-
void flush() throws IOException {
57+
public void flush(boolean endOfInput) throws IOException {
5958
appendWriter.flush();
6059
checkAsyncException();
6160
}
62-
63-
@Override
64-
public void close() throws Exception {
65-
super.close();
66-
}
6761
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java renamed to fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/FlinkSinkWriter.java

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.alibaba.fluss.connector.flink.sink;
17+
package com.alibaba.fluss.connector.flink.sink.writer;
1818

1919
import com.alibaba.fluss.client.Connection;
2020
import com.alibaba.fluss.client.ConnectionFactory;
@@ -30,14 +30,9 @@
3030
import com.alibaba.fluss.metrics.MetricNames;
3131
import com.alibaba.fluss.row.InternalRow;
3232

33+
import org.apache.flink.api.connector.sink2.SinkWriter;
3334
import org.apache.flink.metrics.Counter;
3435
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
35-
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
36-
import org.apache.flink.runtime.state.FunctionInitializationContext;
37-
import org.apache.flink.runtime.state.FunctionSnapshotContext;
38-
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
39-
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
40-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
4136
import org.apache.flink.table.api.ValidationException;
4237
import org.apache.flink.table.data.RowData;
4338
import org.apache.flink.table.types.logical.RowType;
@@ -48,16 +43,13 @@
4843
import javax.annotation.Nullable;
4944

5045
import java.io.IOException;
51-
import java.io.Serializable;
5246
import java.util.Collections;
5347
import java.util.concurrent.CompletableFuture;
5448

55-
/** Flink's {@link SinkFunction} implementation for Fluss. */
56-
abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
57-
implements CheckpointedFunction, Serializable {
49+
/** Base class for Flink {@link SinkWriter} implementations in Fluss. */
50+
public abstract class FlinkSinkWriter implements SinkWriter<RowData> {
5851

59-
private static final long serialVersionUID = 1L;
60-
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkFunction.class);
52+
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkWriter.class);
6153

6254
private final TablePath tablePath;
6355
private final Configuration flussConfig;
@@ -76,15 +68,15 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
7668
private transient Counter numRecordsOutErrorsCounter;
7769
private volatile Throwable asyncWriterException;
7870

79-
public FlinkSinkFunction(
71+
public FlinkSinkWriter(
8072
TablePath tablePath,
8173
Configuration flussConfig,
8274
RowType tableRowType,
8375
boolean ignoreDelete) {
8476
this(tablePath, flussConfig, tableRowType, null, ignoreDelete);
8577
}
8678

87-
public FlinkSinkFunction(
79+
public FlinkSinkWriter(
8880
TablePath tablePath,
8981
Configuration flussConfig,
9082
RowType tableRowType,
@@ -97,14 +89,13 @@ public FlinkSinkFunction(
9789
this.ignoreDelete = ignoreDelete;
9890
}
9991

100-
@Override
101-
public void open(org.apache.flink.configuration.Configuration config) {
92+
public void initialize(SinkWriterMetricGroup metricGroup) {
10293
LOG.info(
10394
"Opening Fluss {}, database: {} and table: {}",
10495
this.getClass().getSimpleName(),
10596
tablePath.getDatabaseName(),
10697
tablePath.getTableName());
107-
metricGroup = InternalSinkWriterMetricGroup.wrap(getRuntimeContext().getMetricGroup());
98+
this.metricGroup = metricGroup;
10899
flinkMetricRegistry =
109100
new FlinkMetricRegistry(
110101
metricGroup, Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS));
@@ -122,7 +113,7 @@ protected void initMetrics() {
122113
}
123114

124115
@Override
125-
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
116+
public void write(RowData value, Context context) throws IOException, InterruptedException {
126117
checkAsyncException();
127118
if (ignoreDelete
128119
&& (value.getRowKind() == RowKind.UPDATE_BEFORE
@@ -144,26 +135,12 @@ public void invoke(RowData value, SinkFunction.Context context) throws IOExcepti
144135
}
145136

146137
@Override
147-
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws IOException {
148-
flush();
149-
}
150-
151-
@Override
152-
public void initializeState(FunctionInitializationContext functionInitializationContext) {}
153-
154-
@Override
155-
public void finish() throws IOException {
156-
flush();
157-
}
158-
159-
abstract void flush() throws IOException;
138+
public abstract void flush(boolean endOfInput) throws IOException, InterruptedException;
160139

161140
abstract CompletableFuture<?> writeRow(RowKind rowKind, InternalRow internalRow);
162141

163142
@Override
164143
public void close() throws Exception {
165-
super.close();
166-
167144
try {
168145
if (table != null) {
169146
table.close();
@@ -194,8 +171,8 @@ public void close() throws Exception {
194171
}
195172

196173
private void sanityCheck(TableInfo flussTableInfo) {
197-
// when it's UpsertSinkFunction, it means it has primary key got from Flink's metadata
198-
boolean hasPrimaryKey = this instanceof UpsertSinkFunction;
174+
// when it's UpsertSinkWriter, it means it has primary key got from Flink's metadata
175+
boolean hasPrimaryKey = this instanceof UpsertSinkWriter;
199176
if (flussTableInfo.hasPrimaryKey() != hasPrimaryKey) {
200177
throw new ValidationException(
201178
String.format(

0 commit comments

Comments
 (0)