Skip to content

Commit 7cd67a6

Browse files
committed
[WIP] Rescale bucket
1 parent 434a4f4 commit 7cd67a6

File tree

32 files changed

+1485
-16
lines changed

32 files changed

+1485
-16
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,25 @@ CompletableFuture<Void> createTable(
235235
*/
236236
CompletableFuture<List<String>> listTables(String databaseName);
237237

238+
/**
239+
* Alter a table.
240+
*
241+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
242+
*
243+
* <ul>
244+
* <li>{@link DatabaseNotExistException} when the database does not exist.
245+
* <li>{@link TableNotExistException} when the table does not exist, if ignoreIfNotExists is
246+
* false.
247+
* </ul>
248+
*
249+
* @param tablePath The table path of the table.
250+
* @param tableDescriptor The table descriptor.
251+
* @param ignoreIfNotExists if it is true, do nothing if table does not exist. If false, throw a
252+
* TableNotExistException.
253+
*/
254+
CompletableFuture<Void> alterTable(
255+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfNotExists);
256+
238257
/**
239258
* List all partitions in the given table in fluss cluster asynchronously.
240259
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.fluss.rpc.gateway.AdminGateway;
4242
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4343
import org.apache.fluss.rpc.gateway.TabletServerGateway;
44+
import org.apache.fluss.rpc.messages.AlterTableRequest;
4445
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4546
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4647
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -235,6 +236,19 @@ public CompletableFuture<Void> createTable(
235236
return gateway.createTable(request).thenApply(r -> null);
236237
}
237238

239+
@Override
240+
public CompletableFuture<Void> alterTable(
241+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfNotExists) {
242+
tablePath.validate();
243+
AlterTableRequest request = new AlterTableRequest();
244+
request.setTableJson(tableDescriptor.toJsonBytes())
245+
.setIgnoreIfNotExists(ignoreIfNotExists)
246+
.setTablePath()
247+
.setDatabaseName(tablePath.getDatabaseName())
248+
.setTableName(tablePath.getTableName());
249+
return gateway.alterTable(request).thenApply(r -> null);
250+
}
251+
238252
@Override
239253
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
240254
GetTableInfoRequest request = new GetTableInfoRequest();

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) {
235235
}
236236
}
237237

238+
public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) {
239+
for (int i = 0; i < expectBucketCount; i++) {
240+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
241+
new TableBucket(tableId, partitionId, i));
242+
}
243+
}
244+
238245
protected static void verifyRows(
239246
RowType rowType,
240247
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.fluss.metadata.TablePath;
6161
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6262
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
63+
import org.apache.fluss.server.zk.data.TableAssignment;
6364
import org.apache.fluss.types.DataTypes;
6465

6566
import org.junit.jupiter.api.BeforeEach;
@@ -197,6 +198,245 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTable() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
210+
Map<String, String> updateProperties =
211+
new HashMap<>(existingTableDescriptor.getProperties());
212+
Map<String, String> updateCustomProperties =
213+
new HashMap<>(existingTableDescriptor.getCustomProperties());
214+
updateProperties.put("table.datalake.enabled", "true");
215+
updateCustomProperties.put("table.datalake.enabled", "true");
216+
217+
TableDescriptor newTableDescriptor =
218+
TableDescriptor.builder()
219+
.schema(existingTableDescriptor.getSchema())
220+
.comment(existingTableDescriptor.getComment().orElse("test table"))
221+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
222+
.distributedBy(
223+
existingTableDescriptor
224+
.getTableDistribution()
225+
.get()
226+
.getBucketCount()
227+
.orElse(3),
228+
existingTableDescriptor.getBucketKeys())
229+
.properties(updateProperties)
230+
.customProperties(updateCustomProperties)
231+
.build();
232+
// alter table
233+
admin.alterTable(tablePath, newTableDescriptor, false).get();
234+
235+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
236+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
237+
assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
238+
239+
// throw exception if table not exist
240+
assertThatThrownBy(
241+
() ->
242+
admin.alterTable(
243+
TablePath.of("test_db", "alter_table_not_exist"),
244+
newTableDescriptor,
245+
false)
246+
.get())
247+
.cause()
248+
.isInstanceOf(TableNotExistException.class);
249+
250+
// throw exception if database not exist
251+
assertThatThrownBy(
252+
() ->
253+
admin.alterTable(
254+
TablePath.of(
255+
"test_db_not_exist",
256+
"alter_table_not_exist"),
257+
newTableDescriptor,
258+
false)
259+
.get())
260+
.cause()
261+
.isInstanceOf(DatabaseNotExistException.class);
262+
}
263+
264+
@Test
265+
void testAlterTableBucket() throws Exception {
266+
Schema logTableSchema =
267+
Schema.newBuilder()
268+
.column("id", DataTypes.INT())
269+
.withComment("person id")
270+
.column("name", DataTypes.STRING())
271+
.withComment("person name")
272+
.column("age", DataTypes.INT())
273+
.withComment("person age")
274+
.build();
275+
276+
// create non-partitioned table
277+
TableDescriptor nonPartitionTableDescriptor =
278+
TableDescriptor.builder()
279+
.schema(logTableSchema)
280+
.comment("test table")
281+
.distributedBy(1, "id")
282+
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
283+
.customProperty("connector", "fluss")
284+
.build();
285+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
286+
admin.createTable(tablePath, nonPartitionTableDescriptor, false).get();
287+
288+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
289+
assertThat(tableInfo.getNumBuckets()).isEqualTo(1);
290+
291+
TableAssignment tableAssignment =
292+
FLUSS_CLUSTER_EXTENSION
293+
.getZooKeeperClient()
294+
.getTableAssignment(tableInfo.getTableId())
295+
.get();
296+
assertThat(tableAssignment.getBuckets().size()).isEqualTo(tableInfo.getNumBuckets());
297+
298+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
299+
TableDescriptor newTableDescriptor =
300+
TableDescriptor.builder()
301+
.schema(existingTableDescriptor.getSchema())
302+
.comment(existingTableDescriptor.getComment().orElse("test table"))
303+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
304+
.distributedBy(
305+
existingTableDescriptor
306+
.getTableDistribution()
307+
.get()
308+
.getBucketCount()
309+
.get()
310+
+ 1,
311+
existingTableDescriptor.getBucketKeys())
312+
.properties(existingTableDescriptor.getProperties())
313+
.customProperties(existingTableDescriptor.getCustomProperties())
314+
.build();
315+
// alter table
316+
admin.alterTable(tablePath, newTableDescriptor, false).get();
317+
318+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
319+
assertThat(alteredTableInfo.getNumBuckets())
320+
.isEqualTo(newTableDescriptor.getTableDistribution().get().getBucketCount().get());
321+
322+
TableAssignment alteredTableAssignment =
323+
FLUSS_CLUSTER_EXTENSION
324+
.getZooKeeperClient()
325+
.getTableAssignment(tableInfo.getTableId())
326+
.get();
327+
assertThat(alteredTableAssignment.getBuckets().size())
328+
.isEqualTo(alteredTableInfo.getNumBuckets());
329+
330+
// create partitioned table
331+
TableDescriptor partitionTableDescriptor =
332+
TableDescriptor.builder()
333+
.schema(logTableSchema)
334+
.comment("test table")
335+
.distributedBy(1, "id")
336+
.partitionedBy("name")
337+
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
338+
.customProperty("connector", "fluss")
339+
.build();
340+
TablePath partitionedTablePath =
341+
TablePath.of("test_db", "alter_table_bucket_partitioned_table");
342+
admin.createTable(partitionedTablePath, partitionTableDescriptor, false).get();
343+
344+
TableInfo partitionedTableInfo = admin.getTableInfo(partitionedTablePath).get();
345+
assertThat(partitionedTableInfo.getNumBuckets()).isEqualTo(1);
346+
347+
// add 1 partition
348+
admin.createPartition(partitionedTablePath, newPartitionSpec("name", "name1"), false).get();
349+
List<PartitionInfo> partitionInfos = admin.listPartitionInfos(partitionedTablePath).get();
350+
assertThat(partitionInfos.size()).isEqualTo(1);
351+
352+
TableAssignment partition1Assignment =
353+
FLUSS_CLUSTER_EXTENSION
354+
.getZooKeeperClient()
355+
.getPartitionAssignment(partitionInfos.get(0).getPartitionId())
356+
.get();
357+
assertThat(partition1Assignment.getBuckets().size()).isEqualTo(1);
358+
359+
TableDescriptor existingPartitionTableDescriptor = partitionedTableInfo.toTableDescriptor();
360+
TableDescriptor newPartitionedTableDescriptor =
361+
TableDescriptor.builder()
362+
.schema(existingPartitionTableDescriptor.getSchema())
363+
.comment(existingPartitionTableDescriptor.getComment().orElse("test table"))
364+
.partitionedBy(existingPartitionTableDescriptor.getPartitionKeys())
365+
.distributedBy(
366+
existingPartitionTableDescriptor
367+
.getTableDistribution()
368+
.get()
369+
.getBucketCount()
370+
.get()
371+
+ 1,
372+
existingPartitionTableDescriptor.getBucketKeys())
373+
.properties(existingPartitionTableDescriptor.getProperties())
374+
.customProperties(existingPartitionTableDescriptor.getCustomProperties())
375+
.build();
376+
// alter table
377+
admin.alterTable(partitionedTablePath, newPartitionedTableDescriptor, false).get();
378+
379+
TableInfo alteredPartitionedTableInfo = admin.getTableInfo(partitionedTablePath).get();
380+
assertThat(alteredPartitionedTableInfo.getNumBuckets())
381+
.isEqualTo(
382+
newPartitionedTableDescriptor
383+
.getTableDistribution()
384+
.get()
385+
.getBucketCount()
386+
.get());
387+
388+
TableAssignment alteredPartition1Assignment =
389+
FLUSS_CLUSTER_EXTENSION
390+
.getZooKeeperClient()
391+
.getTableAssignment(tableInfo.getTableId())
392+
.get();
393+
assertThat(alteredPartition1Assignment.getBuckets().size())
394+
.isEqualTo(alteredPartitionedTableInfo.getNumBuckets());
395+
396+
// add another partition
397+
admin.createPartition(partitionedTablePath, newPartitionSpec("name", "name2"), false).get();
398+
partitionInfos = admin.listPartitionInfos(partitionedTablePath).get();
399+
assertThat(partitionInfos.size()).isEqualTo(2);
400+
401+
TableAssignment partition2Assignment =
402+
FLUSS_CLUSTER_EXTENSION
403+
.getZooKeeperClient()
404+
.getPartitionAssignment(partitionInfos.get(1).getPartitionId())
405+
.get();
406+
assertThat(partition2Assignment.getBuckets().size()).isEqualTo(2);
407+
}
408+
409+
@Test
410+
void testAlterTableBucketForPrimaryKeyTable() throws Exception {
411+
// create table
412+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket_for_pk_table");
413+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
414+
415+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
416+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
417+
418+
TableDescriptor newTableDescriptor =
419+
TableDescriptor.builder()
420+
.schema(existingTableDescriptor.getSchema())
421+
.comment(existingTableDescriptor.getComment().orElse("test table"))
422+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
423+
.distributedBy(
424+
existingTableDescriptor
425+
.getTableDistribution()
426+
.get()
427+
.getBucketCount()
428+
.get()
429+
+ 1,
430+
existingTableDescriptor.getBucketKeys())
431+
.properties(existingTableDescriptor.getProperties())
432+
.customProperties(existingTableDescriptor.getCustomProperties())
433+
.build();
434+
// alter table
435+
assertThatThrownBy(() -> admin.alterTable(tablePath, newTableDescriptor, false).get())
436+
.rootCause()
437+
.isInstanceOf(InvalidTableException.class);
438+
}
439+
200440
@Test
201441
void testCreateInvalidDatabaseAndTable() {
202442
assertThatThrownBy(

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,10 @@
6565
import java.time.Duration;
6666
import java.util.ArrayList;
6767
import java.util.Collections;
68+
import java.util.HashSet;
6869
import java.util.Iterator;
6970
import java.util.List;
71+
import java.util.Set;
7072
import java.util.concurrent.CompletableFuture;
7173

7274
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
@@ -724,6 +726,62 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm
724726
}
725727
}
726728

729+
@Test
730+
void testAppendWithAlterTableBucket() throws Exception {
731+
TableDescriptor data1TableDescriptor =
732+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
733+
createTable(DATA1_TABLE_PATH, data1TableDescriptor, false);
734+
TableInfo tableInfo = admin.getTableInfo(DATA1_TABLE_PATH).get();
735+
736+
int lastCount = verifyAppendForAlterTableBucket(1, 0);
737+
738+
// alter table bucket from 1 to 2
739+
TableDescriptor data2TableDescriptor =
740+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(2).build();
741+
admin.alterTable(DATA1_TABLE_PATH, data2TableDescriptor, false);
742+
743+
// wait until new bucket replicas are ready
744+
waitAllReplicasReady(tableInfo.getTableId(), 2);
745+
746+
verifyAppendForAlterTableBucket(2, lastCount);
747+
}
748+
749+
int verifyAppendForAlterTableBucket(int bucketNum, int lastCount) throws Exception {
750+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
751+
// use round-robin bucket assigner, so that we can append data to all buckets
752+
clientConf.set(
753+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
754+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
755+
Connection conn = ConnectionFactory.createConnection(clientConf);
756+
int rowCount = 10;
757+
int expectedRowCount = lastCount + rowCount;
758+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
759+
AppendWriter appendWriter = table.newAppend().createWriter();
760+
761+
for (int i = 0; i < rowCount; i++) {
762+
GenericRow row = row(i, "a");
763+
appendWriter.append(row).get();
764+
}
765+
appendWriter.flush();
766+
767+
try (LogScanner logScanner = createLogScanner(table)) {
768+
subscribeFromBeginning(logScanner, table);
769+
770+
int count = 0;
771+
Set<TableBucket> allBuckets = new HashSet<>();
772+
while (count < expectedRowCount) {
773+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
774+
allBuckets.addAll(scanRecords.buckets());
775+
count += scanRecords.count();
776+
}
777+
assertThat(allBuckets.size()).isEqualTo(bucketNum);
778+
assertThat(count).isEqualTo(expectedRowCount);
779+
}
780+
}
781+
conn.close();
782+
return expectedRowCount;
783+
}
784+
727785
@ParameterizedTest
728786
@ValueSource(strings = {"INDEXED", "ARROW"})
729787
void testAppendAndProject(String format) throws Exception {

0 commit comments

Comments
 (0)