File tree Expand file tree Collapse file tree 2 files changed +20
-4
lines changed
fluss-flink/fluss-flink-common/src
main/java/com/alibaba/fluss/flink/sink
test/java/com/alibaba/fluss/flink/sink Expand file tree Collapse file tree 2 files changed +20
-4
lines changed Original file line number Diff line number Diff line change @@ -185,10 +185,6 @@ public FlinkSink<InputT> build() {
185185 List <String > bucketKeys = tableInfo .getBucketKeys ();
186186 List <String > partitionKeys = tableInfo .getPartitionKeys ();
187187
188- if (!isUpsert && partialUpdateColumns != null ) {
189- LOG .error ("Partial updates are not supported in append mode." );
190- }
191-
192188 if (isUpsert ) {
193189 LOG .info ("Initializing Fluss upsert sink writer ..." );
194190 writerBuilder =
@@ -231,5 +227,9 @@ private void validateConfiguration() {
231227
232228 checkNotNull (tableName , "Table name is required but not provided." );
233229 checkArgument (!tableName .isEmpty (), "Table name cannot be empty." );
230+
231+ checkArgument (
232+ isUpsert && partialUpdateColumns == null ,
233+ "Partial updates are not supported in append mode." );
234234 }
235235}
Original file line number Diff line number Diff line change @@ -249,6 +249,22 @@ void testBootstrapServersSetting() throws Exception {
249249 assertThat (bootstrapServers ).isEqualTo (this .bootstrapServers );
250250 }
251251
252+ @ Test
253+ void testPartialUpdateColumnsNotAllowedInAppendMode () {
254+ FlussSinkBuilder <Order > builder = new FlussSinkBuilder <>();
255+ builder .setBootstrapServers ("localhost:9123" )
256+ .setDatabase ("testDb" )
257+ .setTable ("testTable" )
258+ .setRowType (orderRowType )
259+ .setSerializationSchema (new OrderSerializationSchema ())
260+ .setPartialUpdateColumns (new int [] {0 , 1 , 2 })
261+ .useAppend ();
262+
263+ assertThatThrownBy (builder ::build )
264+ .isInstanceOf (IllegalArgumentException .class )
265+ .hasMessageContaining ("Partial updates are not supported in append mode." );
266+ }
267+
252268 @ Test
253269 void testFluentChaining () {
254270 // Test that all methods can be chained
You can’t perform that action at this time.
0 commit comments