2020import org .apache .fluss .annotation .Internal ;
2121import org .apache .fluss .config .Configuration ;
2222import org .apache .fluss .flink .sink .serializer .FlussSerializationSchema ;
23+ import org .apache .fluss .flink .sink .shuffle .DataStatisticsOperatorFactory ;
24+ import org .apache .fluss .flink .sink .shuffle .DistributionMode ;
25+ import org .apache .fluss .flink .sink .shuffle .StatisticsOrRecord ;
26+ import org .apache .fluss .flink .sink .shuffle .StatisticsOrRecordChannelComputer ;
27+ import org .apache .fluss .flink .sink .shuffle .StatisticsOrRecordTypeInformation ;
2328import org .apache .fluss .flink .sink .writer .AppendSinkWriter ;
2429import org .apache .fluss .flink .sink .writer .FlinkSinkWriter ;
2530import org .apache .fluss .flink .sink .writer .UpsertSinkWriter ;
2631import org .apache .fluss .metadata .DataLakeFormat ;
2732import org .apache .fluss .metadata .TablePath ;
2833
34+ import org .apache .flink .api .common .functions .FlatMapFunction ;
2935import org .apache .flink .api .common .operators .MailboxExecutor ;
36+ import org .apache .flink .api .common .typeinfo .TypeInformation ;
3037import org .apache .flink .api .connector .sink2 .Sink ;
3138import org .apache .flink .api .connector .sink2 .SinkWriter ;
3239import org .apache .flink .api .connector .sink2 .WriterInitContext ;
3340import org .apache .flink .runtime .metrics .groups .InternalSinkWriterMetricGroup ;
3441import org .apache .flink .streaming .api .connector .sink2 .SupportsPreWriteTopology ;
3542import org .apache .flink .streaming .api .datastream .DataStream ;
43+ import org .apache .flink .streaming .api .datastream .SingleOutputStreamOperator ;
3644import org .apache .flink .table .types .logical .RowType ;
3745
3846import javax .annotation .Nullable ;
@@ -97,8 +105,9 @@ static class AppendSinkWriterBuilder<InputT>
97105 private final List <String > bucketKeys ;
98106 private final List <String > partitionKeys ;
99107 private final @ Nullable DataLakeFormat lakeFormat ;
100- private final boolean shuffleByBucketId ;
108+ private final DistributionMode shuffleMode ;
101109 private final FlussSerializationSchema <InputT > flussSerializationSchema ;
110+ private final @ Nullable TypeInformation <InputT > rowTypeInformation ;
102111
103112 public AppendSinkWriterBuilder (
104113 TablePath tablePath ,
@@ -108,17 +117,19 @@ public AppendSinkWriterBuilder(
108117 List <String > bucketKeys ,
109118 List <String > partitionKeys ,
110119 @ Nullable DataLakeFormat lakeFormat ,
111- boolean shuffleByBucketId ,
112- FlussSerializationSchema <InputT > flussSerializationSchema ) {
120+ DistributionMode shuffleMode ,
121+ FlussSerializationSchema <InputT > flussSerializationSchema ,
122+ @ Nullable TypeInformation <InputT > rowTypeInformation ) {
113123 this .tablePath = tablePath ;
114124 this .flussConfig = flussConfig ;
115125 this .tableRowType = tableRowType ;
116126 this .numBucket = numBucket ;
117127 this .bucketKeys = bucketKeys ;
118128 this .partitionKeys = partitionKeys ;
119129 this .lakeFormat = lakeFormat ;
120- this .shuffleByBucketId = shuffleByBucketId ;
130+ this .shuffleMode = shuffleMode ;
121131 this .flussSerializationSchema = flussSerializationSchema ;
132+ this .rowTypeInformation = rowTypeInformation ;
122133 }
123134
124135 @ Override
@@ -133,20 +144,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
133144
134145 @ Override
135146 public DataStream <InputT > addPreWriteTopology (DataStream <InputT > input ) {
136- // For append only sink, we will do bucket shuffle only if bucket keys are not empty.
137- if (!bucketKeys .isEmpty () && shuffleByBucketId ) {
138- return partition (
139- input ,
140- new FlinkRowDataChannelComputer <>(
141- toFlussRowType (tableRowType ),
142- bucketKeys ,
143- partitionKeys ,
144- lakeFormat ,
145- numBucket ,
146- flussSerializationSchema ),
147- input .getParallelism ());
148- } else {
149- return input ;
147+ switch (shuffleMode ) {
148+ case BUCKET_SHUFFLE :
149+ if (!bucketKeys .isEmpty ()) {
150+ return partition (
151+ input ,
152+ new FlinkRowDataChannelComputer <>(
153+ toFlussRowType (tableRowType ),
154+ bucketKeys ,
155+ partitionKeys ,
156+ lakeFormat ,
157+ numBucket ,
158+ flussSerializationSchema ),
159+ input .getParallelism ());
160+ }
161+ return input ;
162+ case NONE :
163+ return input ;
164+ case DYNAMIC_SHUFFLE :
165+ if (partitionKeys .isEmpty ()) {
166+ throw new UnsupportedOperationException (
167+ "DYNAMIC_SHUFFLE is only supported for partition tables" );
168+ }
169+
170+ if (rowTypeInformation == null ) {
171+ throw new UnsupportedOperationException (
172+ "RowTypeInformation is required for DYNAMIC_SHUFFLE mode." );
173+ }
174+ TypeInformation <StatisticsOrRecord <InputT >> statisticsOrRecordTypeInformation =
175+ new StatisticsOrRecordTypeInformation <>(rowTypeInformation );
176+ SingleOutputStreamOperator <StatisticsOrRecord <InputT >> shuffleStream =
177+ input .transform (
178+ "Range shuffle Collector" ,
179+ statisticsOrRecordTypeInformation ,
180+ new DataStatisticsOperatorFactory <>(
181+ toFlussRowType (tableRowType ),
182+ partitionKeys ,
183+ flussSerializationSchema ))
184+ .uid ("Range shuffle Collector" + tablePath )
185+ // Set the parallelism same as input operator to encourage
186+ // chaining
187+ .setParallelism (input .getParallelism ());
188+
189+ return partition (
190+ shuffleStream ,
191+ new StatisticsOrRecordChannelComputer <>(
192+ toFlussRowType (tableRowType ),
193+ bucketKeys ,
194+ partitionKeys ,
195+ numBucket ,
196+ lakeFormat ,
197+ flussSerializationSchema ),
198+ input .getParallelism ())
199+ .flatMap (
200+ (FlatMapFunction <StatisticsOrRecord <InputT >, InputT >)
201+ (statisticsOrRecord , out ) -> {
202+ if (statisticsOrRecord .hasRecord ()) {
203+ out .collect (statisticsOrRecord .record ());
204+ }
205+ })
206+ .uid ("flat map" + tablePath )
207+ // To promote operator chaining with the downstream writer operator,
208+ // setting slot sharing group and the parallelism as default, {@link
209+ // SinkTransformationTranslator} will set the parallelism same as sink
210+ // transformation.
211+ .slotSharingGroup ("shuffle-partition-custom-group" )
212+ .returns (rowTypeInformation );
213+
214+ default :
215+ throw new UnsupportedOperationException (
216+ "Unsupported distribution mode: " + shuffleMode );
150217 }
151218 }
152219 }
@@ -165,7 +232,7 @@ static class UpsertSinkWriterBuilder<InputT>
165232 private final List <String > bucketKeys ;
166233 private final List <String > partitionKeys ;
167234 private final @ Nullable DataLakeFormat lakeFormat ;
168- private final boolean shuffleByBucketId ;
235+ private final DistributionMode shuffleMode ;
169236 private final FlussSerializationSchema <InputT > flussSerializationSchema ;
170237
171238 UpsertSinkWriterBuilder (
@@ -177,7 +244,7 @@ static class UpsertSinkWriterBuilder<InputT>
177244 List <String > bucketKeys ,
178245 List <String > partitionKeys ,
179246 @ Nullable DataLakeFormat lakeFormat ,
180- boolean shuffleByBucketId ,
247+ DistributionMode shuffleMode ,
181248 FlussSerializationSchema <InputT > flussSerializationSchema ) {
182249 this .tablePath = tablePath ;
183250 this .flussConfig = flussConfig ;
@@ -187,7 +254,7 @@ static class UpsertSinkWriterBuilder<InputT>
187254 this .bucketKeys = bucketKeys ;
188255 this .partitionKeys = partitionKeys ;
189256 this .lakeFormat = lakeFormat ;
190- this .shuffleByBucketId = shuffleByBucketId ;
257+ this .shuffleMode = shuffleMode ;
191258 this .flussSerializationSchema = flussSerializationSchema ;
192259 }
193260
@@ -204,8 +271,9 @@ public UpsertSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
204271
205272 @ Override
206273 public DataStream <InputT > addPreWriteTopology (DataStream <InputT > input ) {
207- return shuffleByBucketId
208- ? partition (
274+ switch (shuffleMode ) {
275+ case BUCKET_SHUFFLE :
276+ return partition (
209277 input ,
210278 new FlinkRowDataChannelComputer <>(
211279 toFlussRowType (tableRowType ),
@@ -214,8 +282,13 @@ public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
214282 lakeFormat ,
215283 numBucket ,
216284 flussSerializationSchema ),
217- input .getParallelism ())
218- : input ;
285+ input .getParallelism ());
286+ case NONE :
287+ return input ;
288+ default :
289+ throw new UnsupportedOperationException (
290+ "Unsupported distribution mode: " + shuffleMode );
291+ }
219292 }
220293 }
221294}
0 commit comments