Skip to content

Commit 2da3bdc

Browse files
committed
Addressed PR feedback
1 parent 78d0196 commit 2da3bdc

File tree

6 files changed

+48
-37
lines changed

6 files changed

+48
-37
lines changed

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.api.connector.sink2.Sink;
2727
import org.apache.flink.api.connector.sink2.SinkWriter;
2828
import org.apache.flink.api.connector.sink2.WriterInitContext;
29+
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
2930
import org.apache.flink.table.data.RowData;
3031
import org.apache.flink.table.types.logical.RowType;
3132

@@ -48,14 +49,15 @@ class FlinkSink implements Sink<RowData> {
4849
@Deprecated
4950
@Override
5051
public SinkWriter<RowData> createWriter(InitContext context) throws IOException {
51-
throw new UnsupportedOperationException(
52-
"Not supported. Use FlinkSink#createWriter(WriterInitContext context)");
52+
FlinkSinkWriter flinkSinkWriter = builder.createWriter();
53+
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
54+
return flinkSinkWriter;
5355
}
5456

5557
@Override
5658
public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOException {
5759
FlinkSinkWriter flinkSinkWriter = builder.createWriter();
58-
flinkSinkWriter.initialize(context);
60+
flinkSinkWriter.initialize(InternalSinkWriterMetricGroup.wrap(context.metricGroup()));
5961
return flinkSinkWriter;
6062
}
6163

@@ -72,17 +74,22 @@ static class AppendSinkWriterBuilder implements SinkWriterBuilder<AppendSinkWrit
7274
private final TablePath tablePath;
7375
private final Configuration flussConfig;
7476
private final RowType tableRowType;
77+
private final boolean ignoreDelete;
7578

7679
public AppendSinkWriterBuilder(
77-
TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
80+
TablePath tablePath,
81+
Configuration flussConfig,
82+
RowType tableRowType,
83+
boolean ignoreDelete) {
7884
this.tablePath = tablePath;
7985
this.flussConfig = flussConfig;
8086
this.tableRowType = tableRowType;
87+
this.ignoreDelete = ignoreDelete;
8188
}
8289

8390
@Override
8491
public AppendSinkWriter createWriter() {
85-
return new AppendSinkWriter(tablePath, flussConfig, tableRowType);
92+
return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete);
8693
}
8794
}
8895

@@ -95,21 +102,25 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWrit
95102
private final Configuration flussConfig;
96103
private final RowType tableRowType;
97104
private final @Nullable int[] targetColumnIndexes;
105+
private final boolean ignoreDelete;
98106

99107
UpsertSinkWriterBuilder(
100108
TablePath tablePath,
101109
Configuration flussConfig,
102110
RowType tableRowType,
103-
@Nullable int[] targetColumnIndexes) {
111+
@Nullable int[] targetColumnIndexes,
112+
boolean ignoreDelete) {
104113
this.tablePath = tablePath;
105114
this.flussConfig = flussConfig;
106115
this.tableRowType = tableRowType;
107116
this.targetColumnIndexes = targetColumnIndexes;
117+
this.ignoreDelete = ignoreDelete;
108118
}
109119

110120
@Override
111121
public UpsertSinkWriter createWriter() {
112-
return new UpsertSinkWriter(tablePath, flussConfig, tableRowType, targetColumnIndexes);
122+
return new UpsertSinkWriter(
123+
tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
113124
}
114125
}
115126
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
164164
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter> flinkSinkWriterBuilder =
165165
(primaryKeyIndexes.length > 0)
166166
? new FlinkSink.UpsertSinkWriterBuilder(
167-
tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete)
167+
tablePath,
168+
flussConfig,
169+
tableRowType,
170+
targetColumnIndexes,
171+
ignoreDelete)
168172
: new FlinkSink.AppendSinkWriterBuilder(
169-
tablePath, flussConfig, tableRowType, ignoreDelete);
173+
tablePath, flussConfig, tableRowType, ignoreDelete);
170174

171175
FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);
172176

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/AppendSinkWriter.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.alibaba.fluss.metadata.TablePath;
2222
import com.alibaba.fluss.row.InternalRow;
2323

24-
import org.apache.flink.api.connector.sink2.WriterInitContext;
24+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2525
import org.apache.flink.table.types.logical.RowType;
2626
import org.apache.flink.types.RowKind;
2727

@@ -33,13 +33,17 @@ public class AppendSinkWriter extends FlinkSinkWriter {
3333

3434
private transient AppendWriter appendWriter;
3535

36-
public AppendWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType, boolean ignoreDelete) {
36+
public AppendSinkWriter(
37+
TablePath tablePath,
38+
Configuration flussConfig,
39+
RowType tableRowType,
40+
boolean ignoreDelete) {
3741
super(tablePath, flussConfig, tableRowType, ignoreDelete);
3842
}
3943

4044
@Override
41-
public void initialize(WriterInitContext context) {
42-
super.initialize(context);
45+
public void initialize(SinkWriterMetricGroup metricGroup) {
46+
super.initialize(metricGroup);
4347
appendWriter = table.newAppend().createWriter();
4448
LOG.info("Finished opening Fluss {}.", this.getClass().getSimpleName());
4549
}
@@ -49,11 +53,6 @@ CompletableFuture<?> writeRow(RowKind rowKind, InternalRow internalRow) {
4953
return appendWriter.append(internalRow);
5054
}
5155

52-
@Override
53-
FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() {
54-
return FlinkRowToFlussRowConverter.create(tableRowType);
55-
}
56-
5756
@Override
5857
public void flush(boolean endOfInput) throws IOException {
5958
appendWriter.flush();

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/FlinkSinkWriter.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@
3131
import com.alibaba.fluss.row.InternalRow;
3232

3333
import org.apache.flink.api.connector.sink2.SinkWriter;
34-
import org.apache.flink.api.connector.sink2.WriterInitContext;
3534
import org.apache.flink.metrics.Counter;
3635
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
37-
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
3836
import org.apache.flink.table.api.ValidationException;
3937
import org.apache.flink.table.data.RowData;
4038
import org.apache.flink.table.types.logical.RowType;
@@ -73,7 +71,8 @@ public abstract class FlinkSinkWriter implements SinkWriter<RowData> {
7371
public FlinkSinkWriter(
7472
TablePath tablePath,
7573
Configuration flussConfig,
76-
RowType tableRowType) {
74+
RowType tableRowType,
75+
boolean ignoreDelete) {
7776
this(tablePath, flussConfig, tableRowType, null, ignoreDelete);
7877
}
7978

@@ -90,13 +89,13 @@ public FlinkSinkWriter(
9089
this.ignoreDelete = ignoreDelete;
9190
}
9291

93-
public void initialize(WriterInitContext context) {
92+
public void initialize(SinkWriterMetricGroup metricGroup) {
9493
LOG.info(
9594
"Opening Fluss {}, database: {} and table: {}",
9695
this.getClass().getSimpleName(),
9796
tablePath.getDatabaseName(),
9897
tablePath.getTableName());
99-
metricGroup = InternalSinkWriterMetricGroup.wrap(context.metricGroup());
98+
this.metricGroup = metricGroup;
10099
flinkMetricRegistry =
101100
new FlinkMetricRegistry(
102101
metricGroup, Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS));
@@ -142,8 +141,6 @@ public void write(RowData value, Context context) throws IOException, Interrupte
142141

143142
@Override
144143
public void close() throws Exception {
145-
super.close();
146-
147144
try {
148145
if (table != null) {
149146
table.close();
@@ -174,7 +171,7 @@ public void close() throws Exception {
174171
}
175172

176173
private void sanityCheck(TableInfo flussTableInfo) {
177-
// when it's UpsertSinkFunction, it means it has primary key got from Flink's metadata
174+
// when it's UpsertSinkWriter, it means it has primary key got from Flink's metadata
178175
boolean hasPrimaryKey = this instanceof UpsertSinkWriter;
179176
if (flussTableInfo.hasPrimaryKey() != hasPrimaryKey) {
180177
throw new ValidationException(

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/writer/UpsertSinkWriter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616

1717
package com.alibaba.fluss.connector.flink.sink.writer;
1818

19-
import com.alibaba.fluss.client.table.writer.UpsertWrite;
19+
import com.alibaba.fluss.client.table.writer.Upsert;
2020
import com.alibaba.fluss.client.table.writer.UpsertWriter;
2121
import com.alibaba.fluss.config.Configuration;
22-
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
2322
import com.alibaba.fluss.metadata.TablePath;
2423
import com.alibaba.fluss.row.InternalRow;
2524

26-
import org.apache.flink.api.connector.sink2.WriterInitContext;
25+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2726
import org.apache.flink.table.types.logical.RowType;
2827
import org.apache.flink.types.RowKind;
2928

@@ -47,8 +46,8 @@ public UpsertSinkWriter(
4746
}
4847

4948
@Override
50-
public void initialize(WriterInitContext context) {
51-
super.initialize(context);
49+
public void initialize(SinkWriterMetricGroup metricGroup) {
50+
super.initialize(metricGroup);
5251
Upsert upsert = table.newUpsert();
5352
if (targetColumnIndexes != null) {
5453
upsert = upsert.partialUpdate(targetColumnIndexes);

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkWriterTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ void testSinkMetrics(String clientId) throws Exception {
8383
Arrays.asList(
8484
new RowType.RowField("id", DataTypes.INT().getLogicalType()),
8585
new RowType.RowField("name", DataTypes.STRING().getLogicalType())));
86-
FlinkSinkFunction flinkSinkFunction =
87-
new AppendSinkFunction(tablePath, flussConf, rowType, false);
86+
FlinkSinkWriter flinkSinkWriter =
87+
new AppendSinkWriter(tablePath, flussConf, rowType, false);
88+
8889
InterceptingOperatorMetricGroup interceptingOperatorMetricGroup =
8990
new InterceptingOperatorMetricGroup();
9091
MockStreamingRuntimeContext mockStreamingRuntimeContext =
@@ -97,10 +98,10 @@ public OperatorMetricGroup getMetricGroup() {
9798
MockWriterInitContext mockWriterInitContext =
9899
new MockWriterInitContext(mockStreamingRuntimeContext);
99100

100-
flinkSinkFunction.initialize(mockWriterInitContext);
101-
flinkSinkFunction.write(
101+
flinkSinkWriter.initialize(mockWriterInitContext.metricGroup());
102+
flinkSinkWriter.write(
102103
GenericRowData.of(1, StringData.fromString("a")), new MockSinkWriterContext());
103-
flinkSinkFunction.flush(false);
104+
flinkSinkWriter.flush(false);
104105

105106
Metric currentSendTime = interceptingOperatorMetricGroup.get(MetricNames.CURRENT_SEND_TIME);
106107
assertThat(currentSendTime).isInstanceOf(Gauge.class);
@@ -110,7 +111,7 @@ public OperatorMetricGroup getMetricGroup() {
110111
assertThat(numRecordSend).isInstanceOf(Counter.class);
111112
assertThat(((Counter) numRecordSend).getCount()).isGreaterThan(0);
112113

113-
flinkSinkFunction.close();
114+
flinkSinkWriter.close();
114115
}
115116

116117
static class MockSinkWriterContext implements SinkWriter.Context {

0 commit comments

Comments
 (0)