4848
4949import org .junit .jupiter .api .Test ;
5050
51- import java .util .ArrayList ;
5251import java .util .Arrays ;
5352import java .util .HashMap ;
5453import java .util .List ;
5554import java .util .Map ;
56- import java .util .stream .Collectors ;
5755
5856import static org .apache .fluss .testutils .DataTestUtils .row ;
59- import static org .apache .fluss .utils .PartitionUtils .convertValueOfType ;
6057import static org .assertj .core .api .Assertions .assertThat ;
6158
6259/** IT case for Fluss partitioned table supporting partition key of different types. */
63- class NewPartitionedTableITCase extends ClientToServerITCaseBase {
64- Schema .Builder schemaBuilder =
60+ class TypedPartitionedTableITCase extends ClientToServerITCaseBase {
61+ private static final Schema .Builder schemaBuilder =
6562 Schema .newBuilder ()
6663 .column ("a" , new StringType ())
6764 .column ("char" , new CharType ())
68- .column ("binary" , new BinaryType ())
65+ .column ("binary" , new BinaryType (6 ))
6966 .column ("boolean" , new BooleanType ())
7067 .column ("bytes" , new BytesType ())
7168 .column ("tinyInt" , new TinyIntType ())
@@ -75,12 +72,12 @@ class NewPartitionedTableITCase extends ClientToServerITCaseBase {
7572 .column ("date" , new DateType ())
7673 .column ("float" , new FloatType ())
7774 .column ("double" , new DoubleType ())
78- .column ("time" , new TimeType ())
75+ .column ("time" , new TimeType (3 ))
7976 .column ("timeStampNTZ" , new TimestampType ())
8077 .column ("timeStampLTZ" , new LocalZonedTimestampType ());
8178
82- Schema schema = schemaBuilder .build ();
83- DataTypeRoot [] allPartitionKeyTypes =
79+ private static final Schema schema = schemaBuilder .build ();
80+ private static final DataTypeRoot [] allPartitionKeyTypes =
8481 new DataTypeRoot [] {
8582 DataTypeRoot .STRING ,
8683 DataTypeRoot .CHAR ,
@@ -99,7 +96,7 @@ class NewPartitionedTableITCase extends ClientToServerITCaseBase {
9996 DataTypeRoot .TIMESTAMP_WITH_LOCAL_TIME_ZONE
10097 };
10198
102- Object [] allPartitionKeyValues =
99+ private static final Object [] allPartitionKeyValues =
103100 new Object [] {
104101 BinaryString .fromString ("a" ),
105102 BinaryString .fromString ("F" ),
@@ -118,7 +115,7 @@ class NewPartitionedTableITCase extends ClientToServerITCaseBase {
118115 TimestampLtz .fromEpochMillis (1748662955428L ) // TIMESTAMP_WITH_LOCAL_TIME_ZONE
119116 };
120117
121- Schema .Column [] extraColumn =
118+ private static final Schema .Column [] extraColumn =
122119 new Schema .Column [] {
123120 new Schema .Column ("a" , new StringType ()),
124121 new Schema .Column ("char" , new CharType ()),
@@ -137,7 +134,7 @@ class NewPartitionedTableITCase extends ClientToServerITCaseBase {
137134 new Schema .Column ("timeStampLTZ" , new LocalZonedTimestampType ())
138135 };
139136
140- List <String > result =
137+ private static final List <String > result =
141138 Arrays .asList (
142139 "a" ,
143140 "F" ,
@@ -183,52 +180,35 @@ public void testPartitionedTable() throws Exception {
183180
184181 @ Test
185182 public void testMultipleTypedPartitionedTable () throws Exception {
186-
187183 for (int i = 0 ; i < allPartitionKeyTypes .length ; i ++) {
188184 String partitionKey = extraColumn [i ].getName ();
189185 TablePath tablePath =
190- TablePath .of ("test_part_db_" + i , "test_static_partitioned_pk_table_" + i );
186+ TablePath .of ("fluss" , "test_static_partitioned_pk_table_" + partitionKey );
191187 createPartitionedTable (tablePath , partitionKey );
192- String partitionValue =
193- convertValueOfType (allPartitionKeyValues [i ], allPartitionKeyTypes [i ]);
194-
195- admin .createPartition (tablePath , newPartitionSpec (partitionKey , partitionValue ), true )
196- .get ();
197-
198- Map <String , Long > partitionIdByNames =
199- FLUSS_CLUSTER_EXTENSION .waitUntilPartitionAllReady (tablePath , 1 );
200-
201- List <PartitionInfo > partitionInfos = admin .listPartitionInfos (tablePath ).get ();
202- List <String > expectedPartitions = new ArrayList <>(partitionIdByNames .keySet ());
203- assertThat (
204- partitionInfos .stream ()
205- .map (PartitionInfo ::getPartitionName )
206- .collect (Collectors .toList ()))
207- .containsExactlyInAnyOrderElementsOf (expectedPartitions );
208188
189+ // append a record to the table will dynamically create the corresponding partition
209190 Table table = conn .getTable (tablePath );
210191 AppendWriter appendWriter = table .newAppend ().createWriter ();
211- Map <Long , List <InternalRow >> expectPartitionAppendRows = new HashMap <>();
212- for (String partition : partitionIdByNames .keySet ()) {
213- for (int j = 0 ; j < allPartitionKeyValues .length ; j ++) {
214- InternalRow row = row (allPartitionKeyValues );
215- appendWriter .append (row );
216- expectPartitionAppendRows
217- .computeIfAbsent (
218- partitionIdByNames .get (partition ), k -> new ArrayList <>())
219- .add (row );
220- }
221- }
192+ InternalRow row = row (allPartitionKeyValues );
193+ appendWriter .append (row );
222194 appendWriter .flush ();
223195
224- assertThat (admin .listPartitionInfos (tablePath ).get ().get (0 ).getPartitionName ())
225- .isEqualTo (result .get (i ));
196+ List <PartitionInfo > actualPartitions = admin .listPartitionInfos (tablePath ).get ();
197+ assertThat (actualPartitions ).hasSize (1 );
198+ PartitionInfo actualPartition = actualPartitions .get (0 );
199+ assertThat (actualPartition .getPartitionName ()).isEqualTo (result .get (i ));
200+
201+ Map <Long , List <InternalRow >> expectPartitionAppendRows = new HashMap <>();
202+ expectPartitionAppendRows .put (actualPartition .getPartitionId (), Arrays .asList (row ));
203+ // assert result
204+ verifyPartitionLogs (table , schema .getRowType (), expectPartitionAppendRows );
205+ admin .dropTable (tablePath , true ).get ();
226206 }
227207 }
228208
229209 private void createPartitionedTable (TablePath tablePath , String partitionKey ) throws Exception {
230210 TableDescriptor partitionTableDescriptor =
231211 TableDescriptor .builder ().schema (schema ).partitionedBy (partitionKey ).build ();
232- createTable (tablePath , partitionTableDescriptor , false );
212+ createTable (tablePath , partitionTableDescriptor , true );
233213 }
234214}
0 commit comments