|
19 | 19 |
|
20 | 20 | import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow; |
21 | 21 | import org.apache.fluss.metadata.ResolvedPartitionSpec; |
| 22 | +import org.apache.fluss.metadata.TableChange; |
22 | 23 | import org.apache.fluss.metadata.TablePath; |
23 | 24 | import org.apache.fluss.record.ChangeType; |
24 | 25 | import org.apache.fluss.row.GenericRow; |
|
28 | 29 | import org.apache.paimon.data.BinaryRow; |
29 | 30 | import org.apache.paimon.data.BinaryRowWriter; |
30 | 31 | import org.apache.paimon.data.BinaryString; |
| 32 | +import org.apache.paimon.schema.SchemaChange; |
31 | 33 | import org.apache.paimon.types.DataType; |
32 | 34 | import org.apache.paimon.types.RowKind; |
33 | 35 | import org.apache.paimon.types.RowType; |
34 | 36 |
|
35 | 37 | import javax.annotation.Nullable; |
36 | 38 |
|
| 39 | +import java.util.ArrayList; |
37 | 40 | import java.util.List; |
| 41 | +import java.util.function.Function; |
38 | 42 |
|
39 | 43 | /** Utils for conversion between Paimon and Fluss. */ |
40 | 44 | public class PaimonConversions { |
@@ -106,4 +110,26 @@ public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) { |
106 | 110 | return org.apache.paimon.data.InternalRow.createFieldGetter(dataType, 0) |
107 | 111 | .getFieldOrNull(flussRowAsPaimonRow); |
108 | 112 | } |
| 113 | + |
| 114 | + public static List<SchemaChange> toPaimonSchemaChanges( |
| 115 | + List<TableChange> tableChanges, Function<String, String> optionKeyTransformer) { |
| 116 | + List<SchemaChange> schemaChanges = new ArrayList<>(tableChanges.size()); |
| 117 | + |
| 118 | + for (TableChange tableChange : tableChanges) { |
| 119 | + if (tableChange instanceof TableChange.SetOption) { |
| 120 | + TableChange.SetOption setOption = (TableChange.SetOption) tableChange; |
| 121 | + schemaChanges.add( |
| 122 | + SchemaChange.setOption( |
| 123 | + optionKeyTransformer.apply(setOption.getKey()), setOption.getValue())); |
| 124 | + } else if (tableChange instanceof TableChange.ResetOption) { |
| 125 | + TableChange.ResetOption resetOption = (TableChange.ResetOption) tableChange; |
| 126 | + schemaChanges.add(SchemaChange.removeOption(optionKeyTransformer.apply(resetOption.getKey()))); |
| 127 | + } else { |
| 128 | + throw new UnsupportedOperationException( |
| 129 | + "Unsupported table change: " + tableChange.getClass()); |
| 130 | + } |
| 131 | + } |
| 132 | + |
| 133 | + return schemaChanges; |
| 134 | + } |
109 | 135 | } |
0 commit comments