1717
1818package com .alibaba .fluss .client .table ;
1919
20+ import com .alibaba .fluss .client .ConnectionFactory ;
2021import com .alibaba .fluss .client .admin .ClientToServerITCaseBase ;
2122import com .alibaba .fluss .client .lookup .Lookuper ;
23+ import com .alibaba .fluss .client .table .scanner .ScanRecord ;
24+ import com .alibaba .fluss .client .table .scanner .log .LogScanner ;
25+ import com .alibaba .fluss .client .table .scanner .log .ScanRecords ;
2226import com .alibaba .fluss .client .table .writer .AppendWriter ;
2327import com .alibaba .fluss .client .table .writer .UpsertWriter ;
2428import com .alibaba .fluss .config .ConfigOptions ;
2731import com .alibaba .fluss .metadata .PartitionInfo ;
2832import com .alibaba .fluss .metadata .PhysicalTablePath ;
2933import com .alibaba .fluss .metadata .Schema ;
34+ import com .alibaba .fluss .metadata .TableBucket ;
3035import com .alibaba .fluss .metadata .TableDescriptor ;
36+ import com .alibaba .fluss .metadata .TableInfo ;
3137import com .alibaba .fluss .metadata .TablePath ;
3238import com .alibaba .fluss .row .GenericRow ;
3339import com .alibaba .fluss .row .InternalRow ;
3440import com .alibaba .fluss .types .DataTypes ;
3541
3642import org .junit .jupiter .api .Test ;
3743
44+ import javax .annotation .Nullable ;
45+
3846import java .time .Duration ;
3947import java .util .ArrayList ;
4048import java .util .HashMap ;
@@ -142,6 +150,86 @@ void testPartitionedLogTable() throws Exception {
142150 verifyPartitionLogs (table , schema .getRowType (), expectPartitionAppendRows );
143151 }
144152
153+ @ Test
154+ void testAlterPartitionTableBucket () throws Exception {
155+ TablePath tablePath = TablePath .of ("test_db_1" , "test_static_partitioned_log_table_1" );
156+ Schema schema = createPartitionedTable (tablePath , false , 1 );
157+ TableInfo tableInfo = admin .getTableInfo (tablePath ).get ();
158+
159+ List <PartitionInfo > partitionInfos = admin .listPartitionInfos (tablePath ).get ();
160+ assertThat (partitionInfos .isEmpty ()).isTrue ();
161+
162+ // add three partitions.
163+ for (int i = 0 ; i < 1 ; i ++) {
164+ admin .createPartition (tablePath , newPartitionSpec ("c" , "c" + i ), false ).get ();
165+ }
166+ partitionInfos = admin .listPartitionInfos (tablePath ).get ();
167+ assertThat (partitionInfos .size ()).isEqualTo (1 );
168+
169+ Table table = conn .getTable (tablePath );
170+ AppendWriter appendWriter = table .newAppend ().createWriter ();
171+ int recordsPerPartition = 5 ;
172+ Map <Long , List <InternalRow >> expectPartitionAppendRows = new HashMap <>();
173+ for (PartitionInfo partitionInfo : partitionInfos ) {
174+ String partitionName = partitionInfo .getPartitionName ();
175+ long partitionId = partitionInfo .getPartitionId ();
176+ for (int j = 0 ; j < recordsPerPartition ; j ++) {
177+ InternalRow row = row (j , "a" + j , partitionName );
178+ appendWriter .append (row );
179+ expectPartitionAppendRows
180+ .computeIfAbsent (partitionId , k -> new ArrayList <>())
181+ .add (row );
182+ }
183+ }
184+ appendWriter .flush ();
185+
186+ // then, let's verify the logs
187+ verifyPartitionLogs (table , schema .getRowType (), expectPartitionAppendRows );
188+
189+ TableDescriptor partitionTableDescriptor =
190+ TableDescriptor .builder ()
191+ .schema (schema )
192+ .distributedBy (2 )
193+ .partitionedBy ("c" )
194+ .build ();
195+ admin .alterTable (tablePath , partitionTableDescriptor , false );
196+
197+ for (PartitionInfo partitionInfo : partitionInfos ) {
198+ waitAllReplicasReady (tableInfo .getTableId (), partitionInfo .getPartitionId (), 2 );
199+ }
200+
201+ conn = ConnectionFactory .createConnection (clientConf );
202+ table = conn .getTable (tablePath );
203+ appendWriter = table .newAppend ().createWriter ();
204+ for (PartitionInfo partitionInfo : partitionInfos ) {
205+ String partitionName = partitionInfo .getPartitionName ();
206+ long partitionId = partitionInfo .getPartitionId ();
207+ for (int j = 0 ; j < recordsPerPartition ; j ++) {
208+ InternalRow row = row (j , "a" + j , partitionName );
209+ appendWriter .append (row );
210+ expectPartitionAppendRows
211+ .computeIfAbsent (partitionId , k -> new ArrayList <>())
212+ .add (row );
213+ }
214+ }
215+ appendWriter .flush ();
216+
217+ try (LogScanner logScanner = table .newScan ().createLogScanner ()) {
218+ for (PartitionInfo partitionInfo : partitionInfos ) {
219+ logScanner .subscribeFromBeginning (partitionInfo .getPartitionId (), 0 );
220+ logScanner .subscribeFromBeginning (partitionInfo .getPartitionId (), 1 );
221+ }
222+
223+ ScanRecords scanRecords = logScanner .poll (Duration .ofSeconds (1 ));
224+ for (TableBucket scanBucket : scanRecords .buckets ()) {
225+ List <ScanRecord > records = scanRecords .records (scanBucket );
226+ for (ScanRecord scanRecord : records ) {
227+ System .out .println ("Bucket: " + scanBucket + ", Record: " + scanRecord );
228+ }
229+ }
230+ }
231+ }
232+
145233 @ Test
146234 void testWriteToNonExistsPartitionWhenDisabledDynamicPartition () throws Exception {
147235 clientConf .set (ConfigOptions .CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED , false );
@@ -234,6 +322,12 @@ void testCreatePartitionExceedMaxPartitionNumber() throws Exception {
234322
235323 private Schema createPartitionedTable (TablePath tablePath , boolean isPrimaryTable )
236324 throws Exception {
325+ return createPartitionedTable (tablePath , isPrimaryTable , null );
326+ }
327+
328+ private Schema createPartitionedTable (
329+ TablePath tablePath , boolean isPrimaryTable , @ Nullable Integer bucketCount )
330+ throws Exception {
237331 Schema .Builder schemaBuilder =
238332 Schema .newBuilder ()
239333 .column ("a" , DataTypes .INT ())
@@ -250,7 +344,11 @@ private Schema createPartitionedTable(TablePath tablePath, boolean isPrimaryTabl
250344 Schema schema = schemaBuilder .build ();
251345
252346 TableDescriptor partitionTableDescriptor =
253- TableDescriptor .builder ().schema (schema ).partitionedBy ("c" ).build ();
347+ TableDescriptor .builder ()
348+ .schema (schema )
349+ .distributedBy (bucketCount )
350+ .partitionedBy ("c" )
351+ .build ();
254352 createTable (tablePath , partitionTableDescriptor , false );
255353 return schema ;
256354 }
0 commit comments