1717
1818package org .apache .fluss .lake .paimon .utils ;
1919
20+ import org .apache .fluss .annotation .VisibleForTesting ;
21+ import org .apache .fluss .exception .InvalidConfigException ;
22+ import org .apache .fluss .exception .InvalidTableException ;
23+ import org .apache .fluss .lake .paimon .FlussDataTypeToPaimonDataType ;
2024import org .apache .fluss .lake .paimon .source .FlussRowAsPaimonRow ;
2125import org .apache .fluss .metadata .TableChange ;
26+ import org .apache .fluss .metadata .TableDescriptor ;
2227import org .apache .fluss .metadata .TablePath ;
2328import org .apache .fluss .record .ChangeType ;
2429import org .apache .fluss .row .GenericRow ;
2530import org .apache .fluss .row .InternalRow ;
2631
32+ import org .apache .paimon .CoreOptions ;
2733import org .apache .paimon .catalog .Identifier ;
34+ import org .apache .paimon .options .Options ;
35+ import org .apache .paimon .schema .Schema ;
2836import org .apache .paimon .schema .SchemaChange ;
2937import org .apache .paimon .types .DataType ;
3038import org .apache .paimon .types .RowKind ;
3139import org .apache .paimon .types .RowType ;
3240
3341import java .util .ArrayList ;
42+ import java .util .HashSet ;
3443import java .util .List ;
35- import java .util .function .Function ;
44+ import java .util .Map ;
45+ import java .util .Set ;
46+
47+ import static org .apache .fluss .lake .paimon .PaimonLakeCatalog .SYSTEM_COLUMNS ;
3648
3749/** Utils for conversion between Paimon and Fluss. */
3850public class PaimonConversions {
3951
52+ // for fluss config
53+ private static final String FLUSS_CONF_PREFIX = "fluss." ;
54+ // for paimon config
55+ private static final String PAIMON_CONF_PREFIX = "paimon." ;
56+
57+ /** Paimon config options set by Fluss should not be set by users. */
58+ @ VisibleForTesting public static final Set <String > PAIMON_UNSETTABLE_OPTIONS = new HashSet <>();
59+
60+ static {
61+ PAIMON_UNSETTABLE_OPTIONS .add (CoreOptions .BUCKET .key ());
62+ PAIMON_UNSETTABLE_OPTIONS .add (CoreOptions .BUCKET_KEY .key ());
63+ PAIMON_UNSETTABLE_OPTIONS .add (CoreOptions .PARTITION_GENERATE_LEGCY_NAME .key ());
64+ }
65+
4066 public static RowKind toRowKind (ChangeType changeType ) {
4167 switch (changeType ) {
4268 case APPEND_ONLY :
@@ -80,22 +106,21 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) {
80106 .getFieldOrNull (flussRowAsPaimonRow );
81107 }
82108
83- public static List <SchemaChange > toPaimonSchemaChanges (
84- List <TableChange > tableChanges , Function <String , String > optionKeyTransformer ) {
109+ public static List <SchemaChange > toPaimonSchemaChanges (List <TableChange > tableChanges ) {
85110 List <SchemaChange > schemaChanges = new ArrayList <>(tableChanges .size ());
86111
87112 for (TableChange tableChange : tableChanges ) {
88113 if (tableChange instanceof TableChange .SetOption ) {
89114 TableChange .SetOption setOption = (TableChange .SetOption ) tableChange ;
90115 schemaChanges .add (
91116 SchemaChange .setOption (
92- optionKeyTransformer . apply (setOption .getKey ()),
117+ convertFlussPropertyKeyToPaimon (setOption .getKey ()),
93118 setOption .getValue ()));
94119 } else if (tableChange instanceof TableChange .ResetOption ) {
95120 TableChange .ResetOption resetOption = (TableChange .ResetOption ) tableChange ;
96121 schemaChanges .add (
97122 SchemaChange .removeOption (
98- optionKeyTransformer . apply (resetOption .getKey ())));
123+ convertFlussPropertyKeyToPaimon (resetOption .getKey ())));
99124 } else {
100125 throw new UnsupportedOperationException (
101126 "Unsupported table change: " + tableChange .getClass ());
@@ -104,4 +129,112 @@ public static List<SchemaChange> toPaimonSchemaChanges(
104129
105130 return schemaChanges ;
106131 }
132+
133+ public static Schema toPaimonSchema (TableDescriptor tableDescriptor ) {
134+ // validate paimon options first
135+ validatePaimonOptions (tableDescriptor .getProperties ());
136+ validatePaimonOptions (tableDescriptor .getCustomProperties ());
137+
138+ Schema .Builder schemaBuilder = Schema .newBuilder ();
139+ Options options = new Options ();
140+
141+ // set default properties
142+ setPaimonDefaultProperties (options );
143+
144+ // When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode.
145+ List <String > bucketKeys = tableDescriptor .getBucketKeys ();
146+ if (!bucketKeys .isEmpty ()) {
147+ int numBuckets =
148+ tableDescriptor
149+ .getTableDistribution ()
150+ .flatMap (TableDescriptor .TableDistribution ::getBucketCount )
151+ .orElseThrow (
152+ () ->
153+ new IllegalArgumentException (
154+ "Bucket count should be set." ));
155+ options .set (CoreOptions .BUCKET , numBuckets );
156+ options .set (CoreOptions .BUCKET_KEY , String .join ("," , bucketKeys ));
157+ } else {
158+ options .set (CoreOptions .BUCKET , CoreOptions .BUCKET .defaultValue ());
159+ }
160+
161+ // set schema
162+ for (org .apache .fluss .metadata .Schema .Column column :
163+ tableDescriptor .getSchema ().getColumns ()) {
164+ String columnName = column .getName ();
165+ if (SYSTEM_COLUMNS .containsKey (columnName )) {
166+ throw new InvalidTableException (
167+ "Column "
168+ + columnName
169+ + " conflicts with a system column name of paimon table, please rename the column." );
170+ }
171+ schemaBuilder .column (
172+ columnName ,
173+ column .getDataType ().accept (FlussDataTypeToPaimonDataType .INSTANCE ),
174+ column .getComment ().orElse (null ));
175+ }
176+
177+ // add system metadata columns to schema
178+ for (Map .Entry <String , DataType > systemColumn : SYSTEM_COLUMNS .entrySet ()) {
179+ schemaBuilder .column (systemColumn .getKey (), systemColumn .getValue ());
180+ }
181+
182+ // set pk
183+ if (tableDescriptor .hasPrimaryKey ()) {
184+ schemaBuilder .primaryKey (
185+ tableDescriptor .getSchema ().getPrimaryKey ().get ().getColumnNames ());
186+ options .set (
187+ CoreOptions .CHANGELOG_PRODUCER .key (),
188+ CoreOptions .ChangelogProducer .INPUT .toString ());
189+ }
190+ // set partition keys
191+ schemaBuilder .partitionKeys (tableDescriptor .getPartitionKeys ());
192+
193+ // set properties to paimon schema
194+ tableDescriptor .getProperties ().forEach ((k , v ) -> setFlussPropertyToPaimon (k , v , options ));
195+ tableDescriptor
196+ .getCustomProperties ()
197+ .forEach ((k , v ) -> setFlussPropertyToPaimon (k , v , options ));
198+ schemaBuilder .options (options .toMap ());
199+ return schemaBuilder .build ();
200+ }
201+
202+ private static void validatePaimonOptions (Map <String , String > properties ) {
203+ properties .forEach (
204+ (k , v ) -> {
205+ String paimonKey = k ;
206+ if (k .startsWith (PAIMON_CONF_PREFIX )) {
207+ paimonKey = k .substring (PAIMON_CONF_PREFIX .length ());
208+ }
209+ if (PAIMON_UNSETTABLE_OPTIONS .contains (paimonKey )) {
210+ throw new InvalidConfigException (
211+ String .format (
212+ "The Paimon option %s will be set automatically by Fluss "
213+ + "and should not be set manually." ,
214+ k ));
215+ }
216+ });
217+ }
218+
219+ private static void setPaimonDefaultProperties (Options options ) {
220+ // set partition.legacy-name to false, otherwise paimon will use toString for all types,
221+ // which will cause inconsistent partition value for the same binary value
222+ options .set (CoreOptions .PARTITION_GENERATE_LEGCY_NAME , false );
223+ }
224+
225+ private static void setFlussPropertyToPaimon (String key , String value , Options options ) {
226+ if (key .startsWith (PAIMON_CONF_PREFIX )) {
227+ options .set (key .substring (PAIMON_CONF_PREFIX .length ()), value );
228+ } else {
229+ options .set (FLUSS_CONF_PREFIX + key , value );
230+ }
231+ }
232+
233+ private static String convertFlussPropertyKeyToPaimon (String key ) {
234+ if (key .startsWith (PAIMON_CONF_PREFIX )) {
235+ return key .substring (PAIMON_CONF_PREFIX .length ());
236+ } else {
237+ return FLUSS_CONF_PREFIX + key ;
238+ }
239+ }
107240}
0 commit comments