Skip to content

Commit 975fe6b

Browse files
committed
address comments
1 parent e718d3c commit 975fe6b

File tree

3 files changed

+28
-84
lines changed

3 files changed

+28
-84
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlussSinkBuilder.java

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.alibaba.fluss.client.Connection;
2020
import com.alibaba.fluss.client.ConnectionFactory;
2121
import com.alibaba.fluss.client.admin.Admin;
22+
import com.alibaba.fluss.config.ConfigOptions;
2223
import com.alibaba.fluss.config.Configuration;
2324
import com.alibaba.fluss.flink.sink.serializer.FlussSerializationSchema;
2425
import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter;
@@ -52,6 +53,7 @@
5253
* FlinkSink<Order> sink = new FlussSinkBuilder<Order>()
5354
* .setBootstrapServers(bootstrapServers)
5455
* .setTable(tableName)
56+
* .setDatabase(databaseName)
5557
* .setRowType(orderRowType)
5658
* .setSerializationSchema(new OrderSerializationSchema())
5759
* .build())
@@ -68,7 +70,7 @@ public class FlussSinkBuilder<InputT> {
6870
private RowType tableRowType;
6971
private boolean ignoreDelete;
7072
private int[] partialUpdateColumns;
71-
private boolean isUpsert;
73+
// private boolean isUpsert;
7274
private final Map<String, String> configOptions = new HashMap<>();
7375
private FlussSerializationSchema<InputT> serializationSchema;
7476
private boolean shuffleByBucketId = true;
@@ -110,29 +112,9 @@ public FlussSinkBuilder<InputT> setPartialUpdateColumns(int[] partialUpdateColum
110112
return this;
111113
}
112114

113-
/** Set target column indexes. */
114-
public FlussSinkBuilder<InputT> setDataLakeFormat(DataLakeFormat lakeFormat) {
115-
this.lakeFormat = lakeFormat;
116-
return this;
117-
}
118-
119115
/** Set shuffle by bucket id. */
120116
public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId) {
121-
if (!shuffleByBucketId) {
122-
this.shuffleByBucketId = false;
123-
}
124-
return this;
125-
}
126-
127-
/** Configure this sink to use upsert semantics. */
128-
public FlussSinkBuilder<InputT> useUpsert() {
129-
this.isUpsert = true;
130-
return this;
131-
}
132-
133-
/** Configure this sink to use append-only semantics. */
134-
public FlussSinkBuilder<InputT> useAppend() {
135-
this.isUpsert = false;
117+
this.shuffleByBucketId = shuffleByBucketId;
136118
return this;
137119
}
138120

@@ -156,15 +138,15 @@ public FlussSinkBuilder<InputT> setSerializationSchema(
156138
}
157139

158140
/** Build the FlussSink. */
159-
public FlinkSink<InputT> build() {
141+
public FlussSink<InputT> build() {
160142
validateConfiguration();
161143

162144
Configuration flussConfig = Configuration.fromMap(configOptions);
163145

164146
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> writerBuilder;
165147

166148
TablePath tablePath = new TablePath(database, tableName);
167-
flussConfig.setString("bootstrap.servers", bootstrapServers);
149+
flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
168150

169151
TableInfo tableInfo;
170152
try (Connection connection = ConnectionFactory.createConnection(flussConfig);
@@ -185,6 +167,14 @@ public FlinkSink<InputT> build() {
185167
List<String> bucketKeys = tableInfo.getBucketKeys();
186168
List<String> partitionKeys = tableInfo.getPartitionKeys();
187169

170+
this.lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
171+
172+
boolean isUpsert = tableInfo.hasPrimaryKey();
173+
174+
checkArgument(
175+
isUpsert && partialUpdateColumns == null,
176+
"Partial updates are not supported in append mode.");
177+
188178
if (isUpsert) {
189179
LOG.info("Initializing Fluss upsert sink writer ...");
190180
writerBuilder =
@@ -227,9 +217,5 @@ private void validateConfiguration() {
227217

228218
checkNotNull(tableName, "Table name is required but not provided.");
229219
checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
230-
231-
checkArgument(
232-
isUpsert && partialUpdateColumns == null,
233-
"Partial updates are not supported in append mode.");
234220
}
235221
}

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlussSinkBuilderTest.java

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -158,19 +158,6 @@ void testConfigOptions() throws Exception {
158158
.containsEntry("option2", "value2");
159159
}
160160

161-
@Test
162-
void testUpsertAndAppendModes() throws Exception {
163-
// Test upsert mode
164-
builder.useUpsert();
165-
boolean isUpsert = getFieldValue(builder, "isUpsert");
166-
assertThat(isUpsert).isTrue();
167-
168-
// Test append mode
169-
builder.useAppend();
170-
isUpsert = getFieldValue(builder, "isUpsert");
171-
assertThat(isUpsert).isFalse();
172-
}
173-
174161
@Test
175162
void testIgnoreDelete() throws Exception {
176163
// Default should be false
@@ -189,10 +176,8 @@ void testDataLakeFormat() throws Exception {
189176
DataLakeFormat lakeFormat = getFieldValue(builder, "lakeFormat");
190177
assertThat(lakeFormat).isNull();
191178

192-
// Test setting format
193-
builder.setDataLakeFormat(DataLakeFormat.PAIMON);
194179
lakeFormat = getFieldValue(builder, "lakeFormat");
195-
assertThat(lakeFormat).isEqualTo(DataLakeFormat.PAIMON);
180+
assertThat(lakeFormat).isEqualTo(null);
196181
}
197182

198183
@Test
@@ -206,10 +191,9 @@ void testShuffleByBucketId() throws Exception {
206191
shuffleByBucketId = getFieldValue(builder, "shuffleByBucketId");
207192
assertThat(shuffleByBucketId).isFalse();
208193

209-
// Test setting back to true should not change value (implementation detail)
210194
builder.setShuffleByBucketId(true);
211195
shuffleByBucketId = getFieldValue(builder, "shuffleByBucketId");
212-
assertThat(shuffleByBucketId).isFalse();
196+
assertThat(shuffleByBucketId).isTrue();
213197
}
214198

215199
@Test
@@ -249,22 +233,6 @@ void testBootstrapServersSetting() throws Exception {
249233
assertThat(bootstrapServers).isEqualTo(this.bootstrapServers);
250234
}
251235

252-
@Test
253-
void testPartialUpdateColumnsNotAllowedInAppendMode() {
254-
FlussSinkBuilder<Order> builder = new FlussSinkBuilder<>();
255-
builder.setBootstrapServers("localhost:9123")
256-
.setDatabase("testDb")
257-
.setTable("testTable")
258-
.setRowType(orderRowType)
259-
.setSerializationSchema(new OrderSerializationSchema())
260-
.setPartialUpdateColumns(new int[] {0, 1, 2})
261-
.useAppend();
262-
263-
assertThatThrownBy(builder::build)
264-
.isInstanceOf(IllegalArgumentException.class)
265-
.hasMessageContaining("Partial updates are not supported in append mode.");
266-
}
267-
268236
@Test
269237
void testFluentChaining() {
270238
// Test that all methods can be chained
@@ -276,10 +244,8 @@ void testFluentChaining() {
276244
.setRowType(orderRowType)
277245
.setIgnoreDelete(true)
278246
.setPartialUpdateColumns(new int[] {0, 1})
279-
.useUpsert()
280247
.setOption("key1", "value1")
281248
.setOptions(new HashMap<>())
282-
.setDataLakeFormat(DataLakeFormat.PAIMON)
283249
.setShuffleByBucketId(false);
284250

285251
// Verify the builder instance is returned

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/sink/FlussSinkITCase.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.flink.table.types.logical.VarCharType;
4747
import org.apache.flink.types.RowKind;
4848
import org.junit.jupiter.api.AfterAll;
49-
import org.junit.jupiter.api.Assertions;
5049
import org.junit.jupiter.api.BeforeEach;
5150
import org.junit.jupiter.api.Test;
5251

@@ -58,6 +57,7 @@
5857

5958
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkRowKind;
6059
import static com.alibaba.fluss.testutils.DataTestUtils.row;
60+
import static org.assertj.core.api.Assertions.assertThat;
6161

6262
/** Integration tests for the Fluss sink connector in Flink. */
6363
public class FlussSinkITCase extends FlinkTestBase {
@@ -78,22 +78,16 @@ public class FlussSinkITCase extends FlinkTestBase {
7878

7979
private static String pkTableName = "orders_test_pk";
8080

81-
private static TablePath ordersPKTablePath;
82-
8381
@BeforeEach
8482
public void setup() throws Exception {
85-
bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
86-
8783
pkTableDescriptor =
8884
TableDescriptor.builder().schema(pkSchema).distributedBy(1, "orderId").build();
8985

9086
TablePath pkTablePath = TablePath.of(DEFAULT_DB, pkTableName);
9187

9288
createTable(pkTablePath, pkTableDescriptor);
9389

94-
this.ordersPKTablePath = new TablePath(DEFAULT_DB, pkTableName);
95-
96-
this.bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
90+
bootstrapServers = conn.getConfiguration().get(ConfigOptions.BOOTSTRAP_SERVERS).get(0);
9791

9892
env = StreamExecutionEnvironment.getExecutionEnvironment();
9993
env.setParallelism(2);
@@ -156,7 +150,6 @@ public void testRowDataTablePKSink() throws Exception {
156150
.setBootstrapServers(bootstrapServers)
157151
.setDatabase(DEFAULT_DB)
158152
.setTable(pkTableName)
159-
.useUpsert()
160153
.setSerializationSchema(serializationSchema)
161154
.setRowType(rowType)
162155
.build();
@@ -197,8 +190,8 @@ public void testRowDataTablePKSink() throws Exception {
197190
inputRows.add(row9);
198191

199192
// Assert result size and elements match
200-
Assertions.assertEquals(rows.size(), inputRows.size());
201-
Assertions.assertTrue(rows.containsAll(inputRows));
193+
assertThat(rows.size()).isEqualTo(inputRows.size());
194+
assertThat(rows.containsAll(inputRows)).isTrue();
202195
}
203196

204197
@Test
@@ -230,7 +223,6 @@ public void testOrdersTablePKSink() throws Exception {
230223
.setBootstrapServers(bootstrapServers)
231224
.setDatabase(DEFAULT_DB)
232225
.setTable(pkTableName)
233-
.useUpsert()
234226
.setSerializationSchema(new TestOrderSerializationSchema())
235227
.setRowType(rowType)
236228
.build();
@@ -268,16 +260,16 @@ public void testOrdersTablePKSink() throws Exception {
268260
orders.add(new TestOrder(800, 23, 602, "addr3", RowKind.UPDATE_BEFORE));
269261
orders.add(new TestOrder(900, 24, 603, "addr4", RowKind.UPDATE_BEFORE));
270262

271-
Assertions.assertEquals(rows.size(), orders.size());
272-
Assertions.assertTrue(rows.containsAll(orders));
263+
assertThat(rows.size()).isEqualTo(orders.size());
264+
assertThat(rows.containsAll(orders)).isTrue();
273265
}
274266

275267
private static class TestOrder implements Serializable {
276-
private long orderId;
277-
private long itemId;
278-
private int amount;
279-
private String address;
280-
private RowKind rowKind;
268+
private final long orderId;
269+
private final long itemId;
270+
private final int amount;
271+
private final String address;
272+
private final RowKind rowKind;
281273

282274
public TestOrder(long orderId, long itemId, int amount, String address, RowKind rowKind) {
283275
this.orderId = orderId;
@@ -340,8 +332,8 @@ public RowWithOp serialize(TestOrder value) throws Exception {
340332
switch (rowKind) {
341333
case INSERT:
342334
case UPDATE_AFTER:
343-
case UPDATE_BEFORE:
344335
return new RowWithOp(row, OperationType.UPSERT);
336+
case UPDATE_BEFORE:
345337
case DELETE:
346338
return new RowWithOp(row, OperationType.DELETE);
347339
default:

0 commit comments

Comments
 (0)