1717
1818package org .apache .fluss .lake .paimon .utils ;
1919
20+ import org .apache .fluss .annotation .VisibleForTesting ;
21+ import org .apache .fluss .exception .InvalidConfigException ;
22+ import org .apache .fluss .exception .InvalidTableException ;
23+ import org .apache .fluss .lake .paimon .FlussDataTypeToPaimonDataType ;
2024import org .apache .fluss .lake .paimon .source .FlussRowAsPaimonRow ;
2125import org .apache .fluss .metadata .TableChange ;
26+ import org .apache .fluss .metadata .TableDescriptor ;
2227import org .apache .fluss .metadata .TablePath ;
2328import org .apache .fluss .record .ChangeType ;
2429import org .apache .fluss .row .GenericRow ;
2530import org .apache .fluss .row .InternalRow ;
2631
32+ import org .apache .paimon .CoreOptions ;
2733import org .apache .paimon .catalog .Identifier ;
34+ import org .apache .paimon .options .Options ;
35+ import org .apache .paimon .schema .Schema ;
2836import org .apache .paimon .schema .SchemaChange ;
2937import org .apache .paimon .types .DataType ;
3038import org .apache .paimon .types .RowKind ;
3139import org .apache .paimon .types .RowType ;
3240
3341import java .util .ArrayList ;
42+ import java .util .HashSet ;
3443import java .util .List ;
35- import java .util .function .Function ;
44+ import java .util .Map ;
45+ import java .util .Set ;
46+
47+ import static org .apache .fluss .lake .paimon .PaimonLakeCatalog .SYSTEM_COLUMNS ;
3648
3749/** Utils for conversion between Paimon and Fluss. */
3850public class PaimonConversions {
3951
52+ // for fluss config
53+ private static final String FLUSS_CONF_PREFIX = "fluss." ;
54+ // for paimon config
55+ private static final String PAIMON_CONF_PREFIX = "paimon." ;
56+
57+ /** Paimon config options set by Fluss should not be set by users. */
58+ @ VisibleForTesting public static final Set <String > PAIMON_UNSETTABLE_OPTIONS = new HashSet <>();
59+
60+ @ VisibleForTesting public static final Options PAIMON_DEFAULT_OPTIONS = new Options ();
61+
62+ static {
63+ PAIMON_UNSETTABLE_OPTIONS .add (CoreOptions .BUCKET .key ());
64+ PAIMON_UNSETTABLE_OPTIONS .add (CoreOptions .BUCKET_KEY .key ());
65+ PAIMON_UNSETTABLE_OPTIONS .add (CoreOptions .CHANGELOG_PRODUCER .key ());
66+
67+ // set partition.legacy-name to false, otherwise paimon will use toString for all types,
68+ // which will cause inconsistent partition value for a same binary value
69+ PAIMON_DEFAULT_OPTIONS .set (CoreOptions .PARTITION_GENERATE_LEGCY_NAME , false );
70+ }
71+
4072 public static RowKind toRowKind (ChangeType changeType ) {
4173 switch (changeType ) {
4274 case APPEND_ONLY :
@@ -80,22 +112,21 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) {
80112 .getFieldOrNull (flussRowAsPaimonRow );
81113 }
82114
83- public static List <SchemaChange > toPaimonSchemaChanges (
84- List <TableChange > tableChanges , Function <String , String > optionKeyTransformer ) {
115+ public static List <SchemaChange > toPaimonSchemaChanges (List <TableChange > tableChanges ) {
85116 List <SchemaChange > schemaChanges = new ArrayList <>(tableChanges .size ());
86117
87118 for (TableChange tableChange : tableChanges ) {
88119 if (tableChange instanceof TableChange .SetOption ) {
89120 TableChange .SetOption setOption = (TableChange .SetOption ) tableChange ;
90121 schemaChanges .add (
91122 SchemaChange .setOption (
92- optionKeyTransformer . apply (setOption .getKey ()),
123+ getFlussPropertyKeyToPaimon (setOption .getKey ()),
93124 setOption .getValue ()));
94125 } else if (tableChange instanceof TableChange .ResetOption ) {
95126 TableChange .ResetOption resetOption = (TableChange .ResetOption ) tableChange ;
96127 schemaChanges .add (
97128 SchemaChange .removeOption (
98- optionKeyTransformer . apply (resetOption .getKey ())));
129+ getFlussPropertyKeyToPaimon (resetOption .getKey ())));
99130 } else {
100131 throw new UnsupportedOperationException (
101132 "Unsupported table change: " + tableChange .getClass ());
@@ -104,4 +135,111 @@ public static List<SchemaChange> toPaimonSchemaChanges(
104135
105136 return schemaChanges ;
106137 }
138+
139+ public static Schema toPaimonSchema (TableDescriptor tableDescriptor ) {
140+ // validate paimon options first
141+ validatePaimonOptions (tableDescriptor .getProperties ());
142+ validatePaimonOptions (tableDescriptor .getCustomProperties ());
143+
144+ Schema .Builder schemaBuilder = Schema .newBuilder ();
145+ Options options = new Options ();
146+
147+ // set default properties
148+ setPaimonDefaultProperties (options );
149+
150+ // When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode.
151+ List <String > bucketKeys = tableDescriptor .getBucketKeys ();
152+ if (!bucketKeys .isEmpty ()) {
153+ int numBuckets =
154+ tableDescriptor
155+ .getTableDistribution ()
156+ .flatMap (TableDescriptor .TableDistribution ::getBucketCount )
157+ .orElseThrow (
158+ () ->
159+ new IllegalArgumentException (
160+ "Bucket count should be set." ));
161+ options .set (CoreOptions .BUCKET , numBuckets );
162+ options .set (CoreOptions .BUCKET_KEY , String .join ("," , bucketKeys ));
163+ } else {
164+ options .set (CoreOptions .BUCKET , CoreOptions .BUCKET .defaultValue ());
165+ }
166+
167+ // set schema
168+ for (org .apache .fluss .metadata .Schema .Column column :
169+ tableDescriptor .getSchema ().getColumns ()) {
170+ String columnName = column .getName ();
171+ if (SYSTEM_COLUMNS .containsKey (columnName )) {
172+ throw new InvalidTableException (
173+ "Column "
174+ + columnName
175+ + " conflicts with a system column name of paimon table, please rename the column." );
176+ }
177+ schemaBuilder .column (
178+ columnName ,
179+ column .getDataType ().accept (FlussDataTypeToPaimonDataType .INSTANCE ),
180+ column .getComment ().orElse (null ));
181+ }
182+
183+ // add system metadata columns to schema
184+ for (Map .Entry <String , DataType > systemColumn : SYSTEM_COLUMNS .entrySet ()) {
185+ schemaBuilder .column (systemColumn .getKey (), systemColumn .getValue ());
186+ }
187+
188+ // set pk
189+ if (tableDescriptor .hasPrimaryKey ()) {
190+ schemaBuilder .primaryKey (
191+ tableDescriptor .getSchema ().getPrimaryKey ().get ().getColumnNames ());
192+ options .set (
193+ CoreOptions .CHANGELOG_PRODUCER .key (),
194+ CoreOptions .ChangelogProducer .INPUT .toString ());
195+ }
196+ // set partition keys
197+ schemaBuilder .partitionKeys (tableDescriptor .getPartitionKeys ());
198+
199+ // set properties to paimon schema
200+ tableDescriptor .getProperties ().forEach ((k , v ) -> setFlussPropertyToPaimon (k , v , options ));
201+ tableDescriptor
202+ .getCustomProperties ()
203+ .forEach ((k , v ) -> setFlussPropertyToPaimon (k , v , options ));
204+ schemaBuilder .options (options .toMap ());
205+ return schemaBuilder .build ();
206+ }
207+
208+ private static void validatePaimonOptions (Map <String , String > properties ) {
209+ properties .forEach (
210+ (k , v ) -> {
211+ String paimonKey = k ;
212+ if (k .startsWith (PAIMON_CONF_PREFIX )) {
213+ paimonKey = k .substring (PAIMON_CONF_PREFIX .length ());
214+ }
215+ if (PAIMON_UNSETTABLE_OPTIONS .contains (paimonKey )
216+ || PAIMON_DEFAULT_OPTIONS .toMap ().containsKey (paimonKey )) {
217+ throw new InvalidConfigException (
218+ String .format (
219+ "The Paimon option %s will be set automatically by Fluss "
220+ + "and should not set manually." ,
221+ k ));
222+ }
223+ });
224+ }
225+
226+ private static void setPaimonDefaultProperties (Options options ) {
227+ PAIMON_DEFAULT_OPTIONS .toMap ().forEach (options ::set );
228+ }
229+
230+ private static void setFlussPropertyToPaimon (String key , String value , Options options ) {
231+ if (key .startsWith (PAIMON_CONF_PREFIX )) {
232+ options .set (key .substring (PAIMON_CONF_PREFIX .length ()), value );
233+ } else {
234+ options .set (FLUSS_CONF_PREFIX + key , value );
235+ }
236+ }
237+
238+ private static String getFlussPropertyKeyToPaimon (String key ) {
239+ if (key .startsWith (PAIMON_CONF_PREFIX )) {
240+ return key .substring (PAIMON_CONF_PREFIX .length ());
241+ } else {
242+ return FLUSS_CONF_PREFIX + key ;
243+ }
244+ }
107245}
0 commit comments