Skip to content

Commit 45384a2

Browse files
committed
add partial updates to the datastraem api
1 parent 3fcde27 commit 45384a2

File tree

3 files changed

+493
-37
lines changed

3 files changed

+493
-37
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ public class FlussSinkBuilder<InputT> {
7373
private final Map<String, String> configOptions = new HashMap<>();
7474
private FlussSerializationSchema<InputT> serializationSchema;
7575
private boolean shuffleByBucketId = true;
76+
// Optional list of columns for partial update. When set, upsert will only update these columns.
77+
// The primary key columns must be fully specified in this list.
78+
private List<String> partialUpdateColumns;
7679

7780
/** Set the bootstrap server for the sink. */
7881
public FlussSinkBuilder<InputT> setBootstrapServers(String bootstrapServers) {
@@ -98,6 +101,28 @@ public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId)
98101
return this;
99102
}
100103

104+
/**
105+
* Enable partial update by specifying the column names to update for upsert tables.
106+
* Primary key columns must be included in this list.
107+
*/
108+
public FlussSinkBuilder<InputT> setPartialUpdateColumns(List<String> columns) {
109+
this.partialUpdateColumns = columns;
110+
return this;
111+
}
112+
113+
/**
114+
* Enable partial update by specifying the column names to update for upsert tables.
115+
* Convenience varargs overload.
116+
*/
117+
public FlussSinkBuilder<InputT> setPartialUpdateColumns(String... columns) {
118+
if (columns == null) {
119+
this.partialUpdateColumns = null;
120+
} else {
121+
this.partialUpdateColumns = java.util.Arrays.asList(columns);
122+
}
123+
return this;
124+
}
125+
101126
/** Set a configuration option. */
102127
public FlussSinkBuilder<InputT> setOption(String key, String value) {
103128
configOptions.put(key, value);
@@ -130,7 +155,7 @@ public FlussSink<InputT> build() {
130155

131156
TableInfo tableInfo;
132157
try (Connection connection = ConnectionFactory.createConnection(flussConfig);
133-
Admin admin = connection.getAdmin()) {
158+
Admin admin = connection.getAdmin()) {
134159
try {
135160
tableInfo = admin.getTableInfo(tablePath).get();
136161
} catch (InterruptedException e) {
@@ -153,12 +178,17 @@ public FlussSink<InputT> build() {
153178

154179
if (isUpsert) {
155180
LOG.info("Initializing Fluss upsert sink writer ...");
181+
int[] targetColumnIndexes =
182+
computeTargetColumnIndexes(
183+
tableRowType.getFieldNames(),
184+
tableInfo.getPrimaryKeys(),
185+
partialUpdateColumns);
156186
writerBuilder =
157187
new FlinkSink.UpsertSinkWriterBuilder<>(
158188
tablePath,
159189
flussConfig,
160190
tableRowType,
161-
null, // not support partialUpdateColumns yet
191+
targetColumnIndexes,
162192
numBucket,
163193
bucketKeys,
164194
partitionKeys,
@@ -193,4 +223,47 @@ private void validateConfiguration() {
193223
checkNotNull(tableName, "Table name is required but not provided.");
194224
checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
195225
}
196-
}
226+
227+
// -------------- Test-visible helper methods --------------
228+
/**
229+
* Computes target column indexes for partial updates.
230+
* If {@code specifiedColumns} is null or empty, returns null indicating full update.
231+
* Validates that all primary key columns are included in the specified columns.
232+
*
233+
* @param allFieldNames the list of all field names in table row type order
234+
* @param primaryKeyNames the list of primary key column names
235+
* @param specifiedColumns the optional list of columns specified for partial update
236+
* @return the indexes into {@code allFieldNames} corresponding to {@code specifiedColumns}, or null for full update
237+
* @throws IllegalArgumentException if a specified column does not exist or primary key coverage is incomplete
238+
*/
239+
static int[] computeTargetColumnIndexes(
240+
List<String> allFieldNames, List<String> primaryKeyNames, List<String> specifiedColumns) {
241+
if (specifiedColumns == null || specifiedColumns.isEmpty()) {
242+
return null; // full update
243+
}
244+
245+
// Map specified column names to indexes
246+
int[] indexes = new int[specifiedColumns.size()];
247+
for (int i = 0; i < specifiedColumns.size(); i++) {
248+
String col = specifiedColumns.get(i);
249+
int idx = allFieldNames.indexOf(col);
250+
checkArgument(
251+
idx >= 0,
252+
"Column '%s' not found in table schema: %s",
253+
col,
254+
allFieldNames);
255+
indexes[i] = idx;
256+
}
257+
258+
// Validate that all primary key columns are covered
259+
for (String pk : primaryKeyNames) {
260+
checkArgument(
261+
specifiedColumns.contains(pk),
262+
"Partial updates must include all primary key columns. Missing primary key column: %s. Provided columns: %s",
263+
pk,
264+
specifiedColumns);
265+
}
266+
267+
return indexes;
268+
}
269+
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java

Lines changed: 79 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525

2626
import java.lang.reflect.Field;
27+
import java.util.Arrays;
2728
import java.util.HashMap;
2829
import java.util.Map;
2930

@@ -47,57 +48,57 @@ void setUp() {
4748
void testConfigurationValidation() throws Exception {
4849
// Test missing bootstrap servers
4950
assertThatThrownBy(
50-
() ->
51-
new FlussSinkBuilder<Order>()
52-
.setDatabase("testDb")
53-
.setTable("testTable")
54-
.build())
51+
() ->
52+
new FlussSinkBuilder<Order>()
53+
.setDatabase("testDb")
54+
.setTable("testTable")
55+
.build())
5556
.isInstanceOf(NullPointerException.class)
5657
.hasMessageContaining("BootstrapServers is required but not provided.");
5758

5859
// Test missing database
5960
assertThatThrownBy(
60-
() ->
61-
new FlussSinkBuilder<Order>()
62-
.setBootstrapServers(bootstrapServers)
63-
.setTable(tableName)
64-
.setSerializationSchema(new OrderSerializationSchema())
65-
.build())
61+
() ->
62+
new FlussSinkBuilder<Order>()
63+
.setBootstrapServers(bootstrapServers)
64+
.setTable(tableName)
65+
.setSerializationSchema(new OrderSerializationSchema())
66+
.build())
6667
.isInstanceOf(RuntimeException.class)
6768
.hasMessageContaining("Database is required but not provided.");
6869

6970
// Test empty database
7071
assertThatThrownBy(
71-
() ->
72-
new FlussSinkBuilder<Order>()
73-
.setBootstrapServers(bootstrapServers)
74-
.setDatabase("")
75-
.setTable(tableName)
76-
.setSerializationSchema(new OrderSerializationSchema())
77-
.build())
72+
() ->
73+
new FlussSinkBuilder<Order>()
74+
.setBootstrapServers(bootstrapServers)
75+
.setDatabase("")
76+
.setTable(tableName)
77+
.setSerializationSchema(new OrderSerializationSchema())
78+
.build())
7879
.isInstanceOf(IllegalArgumentException.class)
7980
.hasMessageContaining("Database cannot be empty");
8081

8182
// Test missing table name
8283
assertThatThrownBy(
83-
() ->
84-
new FlussSinkBuilder<Order>()
85-
.setBootstrapServers(bootstrapServers)
86-
.setDatabase("testDb")
87-
.setSerializationSchema(new OrderSerializationSchema())
88-
.build())
84+
() ->
85+
new FlussSinkBuilder<Order>()
86+
.setBootstrapServers(bootstrapServers)
87+
.setDatabase("testDb")
88+
.setSerializationSchema(new OrderSerializationSchema())
89+
.build())
8990
.isInstanceOf(NullPointerException.class)
9091
.hasMessageContaining("Table name is required");
9192

9293
// Test empty table name
9394
assertThatThrownBy(
94-
() ->
95-
new FlussSinkBuilder<Order>()
96-
.setBootstrapServers(bootstrapServers)
97-
.setDatabase("testDb")
98-
.setTable("")
99-
.setSerializationSchema(new OrderSerializationSchema())
100-
.build())
95+
() ->
96+
new FlussSinkBuilder<Order>()
97+
.setBootstrapServers(bootstrapServers)
98+
.setDatabase("testDb")
99+
.setTable("")
100+
.setSerializationSchema(new OrderSerializationSchema())
101+
.build())
101102
.isInstanceOf(IllegalArgumentException.class)
102103
.hasMessageContaining("Table name cannot be empty");
103104
}
@@ -171,17 +172,62 @@ void testFluentChaining() {
171172
.setTable(tableName)
172173
.setOption("key1", "value1")
173174
.setOptions(new HashMap<>())
174-
.setShuffleByBucketId(false);
175+
.setShuffleByBucketId(false)
176+
.setPartialUpdateColumns("id", "price");
175177

176178
// Verify the builder instance is returned
177179
assertThat(chainedBuilder).isInstanceOf(FlussSinkBuilder.class);
178180
}
179181

182+
@Test
183+
void testComputeTargetColumnIndexesFullUpdate() {
184+
int[] result =
185+
FlussSinkBuilder.computeTargetColumnIndexes(
186+
Arrays.asList("id", "name", "price"),
187+
Arrays.asList("id"),
188+
null);
189+
assertThat(result).isNull();
190+
}
191+
192+
@Test
193+
void testComputeTargetColumnIndexesValidPartialIncludesPk() {
194+
int[] result =
195+
FlussSinkBuilder.computeTargetColumnIndexes(
196+
Arrays.asList("id", "name", "price", "ts"),
197+
Arrays.asList("id"),
198+
Arrays.asList("id", "price"));
199+
assertThat(result).containsExactly(0, 2);
200+
}
201+
202+
@Test
203+
void testComputeTargetColumnIndexesMissingPkThrows() {
204+
assertThatThrownBy(
205+
() ->
206+
FlussSinkBuilder.computeTargetColumnIndexes(
207+
Arrays.asList("id", "name", "price"),
208+
Arrays.asList("id"),
209+
Arrays.asList("name", "price")))
210+
.isInstanceOf(IllegalArgumentException.class)
211+
.hasMessageContaining("Partial updates must include all primary key columns");
212+
}
213+
214+
@Test
215+
void testComputeTargetColumnIndexesUnknownColumnThrows() {
216+
assertThatThrownBy(
217+
() ->
218+
FlussSinkBuilder.computeTargetColumnIndexes(
219+
Arrays.asList("id", "name"),
220+
Arrays.asList("id"),
221+
Arrays.asList("id", "unknown")))
222+
.isInstanceOf(IllegalArgumentException.class)
223+
.hasMessageContaining("not found in table schema");
224+
}
225+
180226
// Helper method to get private field values using reflection
181227
@SuppressWarnings("unchecked")
182228
private <T> T getFieldValue(Object object, String fieldName) throws Exception {
183229
Field field = FlussSinkBuilder.class.getDeclaredField(fieldName);
184230
field.setAccessible(true);
185231
return (T) field.get(object);
186232
}
187-
}
233+
}

0 commit comments

Comments
 (0)