Skip to content

Commit c685863

Browse files
committed
address comments
1 parent 45422fe commit c685863

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlussSinkBuilder.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.alibaba.fluss.client.Connection;
2020
import com.alibaba.fluss.client.ConnectionFactory;
2121
import com.alibaba.fluss.client.admin.Admin;
22+
import com.alibaba.fluss.config.ConfigOptions;
2223
import com.alibaba.fluss.config.Configuration;
2324
import com.alibaba.fluss.flink.sink.serializer.FlussSerializationSchema;
2425
import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter;
@@ -53,6 +54,7 @@
5354
* FlinkSink<Order> sink = new FlussSinkBuilder<Order>()
5455
* .setBootstrapServers(bootstrapServers)
5556
* .setTable(tableName)
57+
* .setDatabase("testDb")
5658
* .setRowType(orderRowType)
5759
* .setSerializationSchema(new OrderSerializationSchema())
5860
* .build())
@@ -111,17 +113,15 @@ public FlussSinkBuilder<InputT> setPartialUpdateColumns(int[] partialUpdateColum
111113
return this;
112114
}
113115

114-
/** Set target column indexes. */
115-
public FlussSinkBuilder<InputT> setDataLakeFormat(DataLakeFormat lakeFormat) {
116-
this.lakeFormat = lakeFormat;
117-
return this;
118-
}
116+
// /** Set target column indexes. */
117+
// public FlussSinkBuilder<InputT> setDataLakeFormat(DataLakeFormat lakeFormat) {
118+
// this.lakeFormat = lakeFormat;
119+
// return this;
120+
// }
119121

120122
/** Set shuffle by bucket id. */
121123
public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId) {
122-
if (!shuffleByBucketId) {
123-
this.shuffleByBucketId = false;
124-
}
124+
this.shuffleByBucketId = shuffleByBucketId;
125125
return this;
126126
}
127127

@@ -157,15 +157,15 @@ public FlussSinkBuilder<InputT> setSerializationSchema(
157157
}
158158

159159
/** Build the FlussSink. */
160-
public FlinkSink<InputT> build() {
160+
public FlussSink<InputT> build() {
161161
validateConfiguration();
162162

163163
Configuration flussConfig = Configuration.fromMap(configOptions);
164164

165165
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> writerBuilder;
166166

167167
TablePath tablePath = new TablePath(database, tableName);
168-
flussConfig.setString("bootstrap.servers", bootstrapServers);
168+
flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
169169

170170
TableInfo tableInfo;
171171
try (Connection connection = ConnectionFactory.createConnection(flussConfig);
@@ -185,6 +185,10 @@ public FlinkSink<InputT> build() {
185185
int numBucket = tableInfo.getNumBuckets();
186186
List<String> bucketKeys = tableInfo.getBucketKeys();
187187
List<String> partitionKeys = tableInfo.getPartitionKeys();
188+
tableInfo
189+
.getTableConfig()
190+
.getDataLakeFormat()
191+
.ifPresent(format -> lakeFormat = format);
188192

189193
if (isUpsert) {
190194
LOG.info("Initializing Fluss upsert sink writer ...");

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlussSinkBuilderTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737

3838
/** Tests for {@link FlussSinkBuilder} configuration and argument handling. */
3939
class FlussSinkBuilderTest {
40-
private String bootstrapServers = "localhost:9123";
41-
private String databaseName = "testDb";
42-
private String tableName = "testTable";
40+
private final String bootstrapServers = "localhost:9123";
41+
private final String databaseName = "testDb";
42+
private final String tableName = "testTable";
4343

4444
private FlussSinkBuilder<Order> builder;
4545
private RowType orderRowType;
@@ -190,8 +190,7 @@ void testDataLakeFormat() throws Exception {
190190
assertThat(lakeFormat).isNull();
191191

192192
// Test setting format
193-
builder.setDataLakeFormat(DataLakeFormat.PAIMON);
194-
lakeFormat = getFieldValue(builder, "lakeFormat");
193+
lakeFormat = getFieldValue(builder, null);
195194
assertThat(lakeFormat).isEqualTo(DataLakeFormat.PAIMON);
196195
}
197196

@@ -279,7 +278,6 @@ void testFluentChaining() {
279278
.useUpsert()
280279
.setOption("key1", "value1")
281280
.setOptions(new HashMap<>())
282-
.setDataLakeFormat(DataLakeFormat.PAIMON)
283281
.setShuffleByBucketId(false);
284282

285283
// Verify the builder instance is returned

0 commit comments

Comments
 (0)