Skip to content

Commit 4922bfe

Browse files
authored
[connector/flink] Introduce DataStream Sink API for Flink (#908)
1 parent 09ea497 commit 4922bfe

File tree

4 files changed

+827
-0
lines changed

4 files changed

+827
-0
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.sink;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter;
21+
22+
/**
23+
* FlussSink is a specialized Flink sink for writing data to Fluss.
24+
*
25+
* <p>This class extends {@link FlinkSink} and provides a builder for constructing Fluss sink
26+
* instances with custom configurations. It is intended to be used as the main entry point for
27+
* integrating Fluss as a sink in Flink data pipelines.
28+
*
29+
* @param <InputT> the type of input elements accepted by the sink
30+
* @since 0.7
31+
*/
32+
@PublicEvolving
33+
public class FlussSink<InputT> extends FlinkSink<InputT> {
34+
private static final long serialVersionUID = 1L;
35+
36+
/**
37+
* Constructs a FlussSink with the given SinkWriterBuilder.
38+
*
39+
* @param builder the builder used to create the sink writer
40+
*/
41+
FlussSink(SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> builder) {
42+
super(builder);
43+
}
44+
45+
/**
46+
* Creates a new {@link FlussSinkBuilder} instance for building a FlussSink.
47+
*
48+
* @param <T> the type of input elements
49+
* @return a new FlussSinkBuilder instance
50+
*/
51+
public static <T> FlussSinkBuilder<T> builder() {
52+
return new FlussSinkBuilder<>();
53+
}
54+
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.sink;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.client.Connection;
21+
import com.alibaba.fluss.client.ConnectionFactory;
22+
import com.alibaba.fluss.client.admin.Admin;
23+
import com.alibaba.fluss.config.ConfigOptions;
24+
import com.alibaba.fluss.config.Configuration;
25+
import com.alibaba.fluss.flink.sink.serializer.FlussSerializationSchema;
26+
import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter;
27+
import com.alibaba.fluss.metadata.DataLakeFormat;
28+
import com.alibaba.fluss.metadata.TableInfo;
29+
import com.alibaba.fluss.metadata.TablePath;
30+
31+
import org.apache.flink.table.types.logical.RowType;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.util.HashMap;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.concurrent.ExecutionException;
39+
40+
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkRowType;
41+
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
42+
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
43+
44+
/**
45+
* Builder for creating and configuring Fluss sink connectors for Apache Flink.
46+
*
47+
* <p>The builder supports automatic schema inference from POJO classes using reflection and
48+
* provides options for customizing data conversion logic through custom converters.
49+
*
50+
* <p>Example usage:
51+
*
52+
* <pre>{@code
53+
* FlinkSink<Order> sink = new FlussSinkBuilder<Order>()
54+
* .setBootstrapServers(bootstrapServers)
55+
* .setTable(tableName)
56+
* .setDatabase(databaseName)
57+
* .setRowType(orderRowType)
58+
* .setSerializationSchema(new OrderSerializationSchema())
59+
* .build())
60+
* }</pre>
61+
*
62+
* @param <InputT>> The input type of records to be written to Fluss
63+
* @since 0.7
64+
*/
65+
@PublicEvolving
66+
public class FlussSinkBuilder<InputT> {
67+
private static final Logger LOG = LoggerFactory.getLogger(FlussSinkBuilder.class);
68+
69+
private String bootstrapServers;
70+
private String database;
71+
private String tableName;
72+
private final Map<String, String> configOptions = new HashMap<>();
73+
private FlussSerializationSchema<InputT> serializationSchema;
74+
private boolean shuffleByBucketId = true;
75+
76+
/** Set the bootstrap server for the sink. */
77+
public FlussSinkBuilder<InputT> setBootstrapServers(String bootstrapServers) {
78+
this.bootstrapServers = bootstrapServers;
79+
return this;
80+
}
81+
82+
/** Set the database for the sink. */
83+
public FlussSinkBuilder<InputT> setDatabase(String database) {
84+
this.database = database;
85+
return this;
86+
}
87+
88+
/** Set the table name for the sink. */
89+
public FlussSinkBuilder<InputT> setTable(String table) {
90+
this.tableName = table;
91+
return this;
92+
}
93+
94+
/** Set shuffle by bucket id. */
95+
public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId) {
96+
this.shuffleByBucketId = shuffleByBucketId;
97+
return this;
98+
}
99+
100+
/** Set a configuration option. */
101+
public FlussSinkBuilder<InputT> setOption(String key, String value) {
102+
configOptions.put(key, value);
103+
return this;
104+
}
105+
106+
/** Set multiple configuration options. */
107+
public FlussSinkBuilder<InputT> setOptions(Map<String, String> options) {
108+
configOptions.putAll(options);
109+
return this;
110+
}
111+
112+
/** Set a FlussSerializationSchema. */
113+
public FlussSinkBuilder<InputT> setSerializationSchema(
114+
FlussSerializationSchema<InputT> serializationSchema) {
115+
this.serializationSchema = serializationSchema;
116+
return this;
117+
}
118+
119+
/** Build the FlussSink. */
120+
public FlussSink<InputT> build() {
121+
validateConfiguration();
122+
123+
Configuration flussConfig = Configuration.fromMap(configOptions);
124+
125+
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> writerBuilder;
126+
127+
TablePath tablePath = new TablePath(database, tableName);
128+
flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
129+
130+
TableInfo tableInfo;
131+
try (Connection connection = ConnectionFactory.createConnection(flussConfig);
132+
Admin admin = connection.getAdmin()) {
133+
try {
134+
tableInfo = admin.getTableInfo(tablePath).get();
135+
} catch (InterruptedException e) {
136+
Thread.currentThread().interrupt();
137+
throw new RuntimeException("Interrupted while getting table info", e);
138+
} catch (ExecutionException e) {
139+
throw new RuntimeException("Failed to get table info", e);
140+
}
141+
} catch (Exception e) {
142+
throw new RuntimeException(
143+
"Failed to initialize FlussSource admin connection: " + e.getMessage(), e);
144+
}
145+
int numBucket = tableInfo.getNumBuckets();
146+
List<String> bucketKeys = tableInfo.getBucketKeys();
147+
List<String> partitionKeys = tableInfo.getPartitionKeys();
148+
RowType tableRowType = toFlinkRowType(tableInfo.getRowType());
149+
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
150+
151+
boolean isUpsert = tableInfo.hasPrimaryKey();
152+
153+
if (isUpsert) {
154+
LOG.info("Initializing Fluss upsert sink writer ...");
155+
writerBuilder =
156+
new FlinkSink.UpsertSinkWriterBuilder<>(
157+
tablePath,
158+
flussConfig,
159+
tableRowType,
160+
null, // not support partialUpdateColumns yet
161+
numBucket,
162+
bucketKeys,
163+
partitionKeys,
164+
lakeFormat,
165+
shuffleByBucketId,
166+
serializationSchema);
167+
} else {
168+
LOG.info("Initializing Fluss append sink writer ...");
169+
writerBuilder =
170+
new FlinkSink.AppendSinkWriterBuilder<>(
171+
tablePath,
172+
flussConfig,
173+
tableRowType,
174+
numBucket,
175+
bucketKeys,
176+
partitionKeys,
177+
lakeFormat,
178+
shuffleByBucketId,
179+
serializationSchema);
180+
}
181+
182+
return new FlussSink<>(writerBuilder);
183+
}
184+
185+
private void validateConfiguration() {
186+
checkNotNull(bootstrapServers, "BootstrapServers is required but not provided.");
187+
checkNotNull(serializationSchema, "SerializationSchema is required but not provided.");
188+
189+
checkNotNull(database, "Database is required but not provided.");
190+
checkArgument(!database.isEmpty(), "Database cannot be empty.");
191+
192+
checkNotNull(tableName, "Table name is required but not provided.");
193+
checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
194+
}
195+
}

0 commit comments

Comments
 (0)