@@ -73,6 +73,9 @@ public class FlussSinkBuilder<InputT> {
7373 private final Map <String , String > configOptions = new HashMap <>();
7474 private FlussSerializationSchema <InputT > serializationSchema ;
7575 private boolean shuffleByBucketId = true ;
76+ // Optional list of columns for partial update. When set, upsert will only update these columns.
77+ // The primary key columns must be fully specified in this list.
78+ private List <String > partialUpdateColumns ;
7679
7780 /** Set the bootstrap server for the sink. */
7881 public FlussSinkBuilder <InputT > setBootstrapServers (String bootstrapServers ) {
@@ -98,6 +101,28 @@ public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId)
98101 return this ;
99102 }
100103
104+ /**
105+ * Enable partial update by specifying the column names to update for upsert tables.
106+ * Primary key columns must be included in this list.
107+ */
108+ public FlussSinkBuilder <InputT > setPartialUpdateColumns (List <String > columns ) {
109+ this .partialUpdateColumns = columns ;
110+ return this ;
111+ }
112+
113+ /**
114+ * Enable partial update by specifying the column names to update for upsert tables.
115+ * Convenience varargs overload.
116+ */
117+ public FlussSinkBuilder <InputT > setPartialUpdateColumns (String ... columns ) {
118+ if (columns == null ) {
119+ this .partialUpdateColumns = null ;
120+ } else {
121+ this .partialUpdateColumns = java .util .Arrays .asList (columns );
122+ }
123+ return this ;
124+ }
125+
101126 /** Set a configuration option. */
102127 public FlussSinkBuilder <InputT > setOption (String key , String value ) {
103128 configOptions .put (key , value );
@@ -151,14 +176,24 @@ public FlussSink<InputT> build() {
151176
152177 boolean isUpsert = tableInfo .hasPrimaryKey ();
153178
179+ int [] targetColumnIndexes = null ;
180+ if (isUpsert ) {
181+ // Compute target column indexes for partial update if columns were provided
182+ targetColumnIndexes =
183+ computeTargetColumnIndexes (
184+ tableRowType .getFieldNames (),
185+ tableInfo .getPrimaryKeys (),
186+ partialUpdateColumns );
187+ }
188+
154189 if (isUpsert ) {
155190 LOG .info ("Initializing Fluss upsert sink writer ..." );
156191 writerBuilder =
157192 new FlinkSink .UpsertSinkWriterBuilder <>(
158193 tablePath ,
159194 flussConfig ,
160195 tableRowType ,
161- null , // not support partialUpdateColumns yet
196+ targetColumnIndexes ,
162197 numBucket ,
163198 bucketKeys ,
164199 partitionKeys ,
@@ -193,4 +228,47 @@ private void validateConfiguration() {
193228 checkNotNull (tableName , "Table name is required but not provided." );
194229 checkArgument (!tableName .isEmpty (), "Table name cannot be empty." );
195230 }
231+
232+ // -------------- Test-visible helper methods --------------
233+ /**
234+ * Computes target column indexes for partial updates.
235+ * If {@code specifiedColumns} is null or empty, returns null indicating full update.
236+ * Validates that all primary key columns are included in the specified columns.
237+ *
238+ * @param allFieldNames the list of all field names in table row type order
239+ * @param primaryKeyNames the list of primary key column names
240+ * @param specifiedColumns the optional list of columns specified for partial update
241+ * @return the indexes into {@code allFieldNames} corresponding to {@code specifiedColumns}, or null for full update
242+ * @throws IllegalArgumentException if a specified column does not exist or primary key coverage is incomplete
243+ */
244+ static int [] computeTargetColumnIndexes (
245+ List <String > allFieldNames , List <String > primaryKeyNames , List <String > specifiedColumns ) {
246+ if (specifiedColumns == null || specifiedColumns .isEmpty ()) {
247+ return null ; // full update
248+ }
249+
250+ // Map specified column names to indexes
251+ int [] indexes = new int [specifiedColumns .size ()];
252+ for (int i = 0 ; i < specifiedColumns .size (); i ++) {
253+ String col = specifiedColumns .get (i );
254+ int idx = allFieldNames .indexOf (col );
255+ checkArgument (
256+ idx >= 0 ,
257+ "Column '%s' not found in table schema: %s" ,
258+ col ,
259+ allFieldNames );
260+ indexes [i ] = idx ;
261+ }
262+
263+ // Validate that all primary key columns are covered
264+ for (String pk : primaryKeyNames ) {
265+ checkArgument (
266+ specifiedColumns .contains (pk ),
267+ "Partial updates must include all primary key columns. Missing primary key column: %s. Provided columns: %s" ,
268+ pk ,
269+ specifiedColumns );
270+ }
271+
272+ return indexes ;
273+ }
196274}
0 commit comments