1717
1818package org .apache .fluss .lake .paimon .tiering ;
1919
20+ import org .apache .fluss .config .ConfigOptions ;
2021import org .apache .fluss .config .Configuration ;
2122import org .apache .fluss .lake .committer .CommittedLakeSnapshot ;
2223import org .apache .fluss .lake .committer .LakeCommitter ;
2324import org .apache .fluss .lake .serializer .SimpleVersionedSerializer ;
2425import org .apache .fluss .lake .writer .LakeWriter ;
2526import org .apache .fluss .lake .writer .WriterInitContext ;
2627import org .apache .fluss .metadata .TableBucket ;
28+ import org .apache .fluss .metadata .TableDescriptor ;
29+ import org .apache .fluss .metadata .TableInfo ;
2730import org .apache .fluss .metadata .TablePath ;
2831import org .apache .fluss .record .ChangeType ;
2932import org .apache .fluss .record .GenericRecord ;
@@ -118,6 +121,17 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
118121 isPartitioned ? "partitioned" : "non_partitioned" ));
119122 createTable (
120123 tablePath , isPrimaryKeyTable , isPartitioned , isPrimaryKeyTable ? bucketNum : null );
124+ TableDescriptor descriptor =
125+ TableDescriptor .builder ()
126+ .schema (
127+ org .apache .fluss .metadata .Schema .newBuilder ()
128+ .column ("c1" , org .apache .fluss .types .DataTypes .INT ())
129+ .column ("c2" , org .apache .fluss .types .DataTypes .STRING ())
130+ .column ("c3" , org .apache .fluss .types .DataTypes .STRING ())
131+ .build ())
132+ .property (ConfigOptions .TABLE_DATALAKE_ENABLED , true )
133+ .build ();
134+ TableInfo tableInfo = TableInfo .of (tablePath , 0 , 1 , descriptor , 1L , 1L );
121135
122136 List <PaimonWriteResult > paimonWriteResults = new ArrayList <>();
123137 SimpleVersionedSerializer <PaimonWriteResult > writeResultSerializer =
@@ -148,7 +162,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
148162 for (Map .Entry <Long , String > entry : partitionIdAndName .entrySet ()) {
149163 String partition = entry .getValue ();
150164 try (LakeWriter <PaimonWriteResult > lakeWriter =
151- createLakeWriter (tablePath , bucket , partition , entry .getKey ())) {
165+ createLakeWriter (tablePath , bucket , partition , entry .getKey (), tableInfo )) {
152166 Tuple2 <String , Integer > partitionBucket = Tuple2 .of (partition , bucket );
153167 Tuple2 <List <LogRecord >, List <LogRecord >> writeAndExpectRecords =
154168 isPrimaryKeyTable
@@ -233,6 +247,19 @@ void testMultiPartitionTiering() throws Exception {
233247 // Test multiple partitions: region + year
234248 TablePath tablePath = TablePath .of ("paimon" , "test_multi_partition" );
235249 createMultiPartitionTable (tablePath );
250+ TableDescriptor descriptor =
251+ TableDescriptor .builder ()
252+ .schema (
253+ org .apache .fluss .metadata .Schema .newBuilder ()
254+ .column ("c1" , org .apache .fluss .types .DataTypes .INT ())
255+ .column ("c2" , org .apache .fluss .types .DataTypes .STRING ())
256+ .column ("region" , org .apache .fluss .types .DataTypes .STRING ())
257+ .column ("year" , org .apache .fluss .types .DataTypes .STRING ())
258+ .build ())
259+ .partitionedBy ("region" , "year" )
260+ .property (ConfigOptions .TABLE_DATALAKE_ENABLED , true )
261+ .build ();
262+ TableInfo tableInfo = TableInfo .of (tablePath , 0 , 1 , descriptor , 1L , 1L );
236263
237264 Map <String , List <LogRecord >> recordsByPartition = new HashMap <>();
238265 List <PaimonWriteResult > paimonWriteResults = new ArrayList <>();
@@ -253,7 +280,7 @@ void testMultiPartitionTiering() throws Exception {
253280 for (Map .Entry <Long , String > entry : partitionIdAndName .entrySet ()) {
254281 String partition = entry .getValue ();
255282 try (LakeWriter <PaimonWriteResult > lakeWriter =
256- createLakeWriter (tablePath , bucket , partition , entry .getKey ())) {
283+ createLakeWriter (tablePath , bucket , partition , entry .getKey (), tableInfo )) {
257284 List <LogRecord > logRecords =
258285 genLogTableRecordsForMultiPartition (partition , bucket , 3 );
259286 recordsByPartition .put (partition , logRecords );
@@ -295,7 +322,20 @@ void testThreePartitionTiering() throws Exception {
295322 // Test three partitions: region + year + month
296323 TablePath tablePath = TablePath .of ("paimon" , "test_three_partition" );
297324 createThreePartitionTable (tablePath );
298-
325+ TableDescriptor descriptor =
326+ TableDescriptor .builder ()
327+ .schema (
328+ org .apache .fluss .metadata .Schema .newBuilder ()
329+ .column ("c1" , org .apache .fluss .types .DataTypes .INT ())
330+ .column ("c2" , org .apache .fluss .types .DataTypes .STRING ())
331+ .column ("region" , org .apache .fluss .types .DataTypes .STRING ())
332+ .column ("year" , org .apache .fluss .types .DataTypes .STRING ())
333+ .column ("month" , org .apache .fluss .types .DataTypes .STRING ())
334+ .build ())
335+ .partitionedBy ("region" , "year" , "month" )
336+ .property (ConfigOptions .TABLE_DATALAKE_ENABLED , true )
337+ .build ();
338+ TableInfo tableInfo = TableInfo .of (tablePath , 0 , 1 , descriptor , 1L , 1L );
299339 Map <String , List <LogRecord >> recordsByPartition = new HashMap <>();
300340 List <PaimonWriteResult > paimonWriteResults = new ArrayList <>();
301341 Map <TableBucket , Long > tableBucketOffsets = new HashMap <>();
@@ -313,7 +353,7 @@ void testThreePartitionTiering() throws Exception {
313353 for (Map .Entry <Long , String > entry : partitionIdAndName .entrySet ()) {
314354 String partition = entry .getValue ();
315355 try (LakeWriter <PaimonWriteResult > lakeWriter =
316- createLakeWriter (tablePath , bucket , partition , entry .getKey ())) {
356+ createLakeWriter (tablePath , bucket , partition , entry .getKey (), tableInfo )) {
317357 List <LogRecord > logRecords =
318358 genLogTableRecordsForMultiPartition (
319359 partition , bucket , 2 ); // Use same method
@@ -639,7 +679,11 @@ private CloseableIterator<InternalRow> getPaimonRowsThreePartition(
639679 }
640680
641681 private LakeWriter <PaimonWriteResult > createLakeWriter (
642- TablePath tablePath , int bucket , @ Nullable String partition , @ Nullable Long partitionId )
682+ TablePath tablePath ,
683+ int bucket ,
684+ @ Nullable String partition ,
685+ @ Nullable Long partitionId ,
686+ TableInfo tableInfo )
643687 throws IOException {
644688 return paimonLakeTieringFactory .createLakeWriter (
645689 new WriterInitContext () {
@@ -661,15 +705,8 @@ public String partition() {
661705 }
662706
663707 @ Override
664- public Map <String , String > customProperties () {
665- // don't care about table custom properties for Paimon lake writer
666- return new HashMap <>();
667- }
668-
669- @ Override
670- public org .apache .fluss .metadata .Schema schema () {
671- throw new UnsupportedOperationException (
672- "The lake writer in Paimon currently uses paimonCatalog to determine the schema." );
708+ public TableInfo tableInfo () {
709+ return tableInfo ;
673710 }
674711 });
675712 }
0 commit comments