Skip to content

Commit 925cafc

Browse files
author
Liebing
committed
[WIP] Rescale bucket
1 parent c1dc8e3 commit 925cafc

File tree

31 files changed

+1343
-17
lines changed

31 files changed

+1343
-17
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,25 @@ CompletableFuture<Void> createTable(
234234
*/
235235
CompletableFuture<List<String>> listTables(String databaseName);
236236

237+
/**
238+
* Alter a table.
239+
*
240+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
241+
*
242+
* <ul>
243+
* <li>{@link DatabaseNotExistException} when the database does not exist.
244+
* <li>{@link TableNotExistException} when the table does not exist, if ignoreIfNotExists is
245+
* false.
246+
* </ul>
247+
*
248+
* @param tablePath The table path of the table.
249+
* @param tableDescriptor The table descriptor.
250+
* @param ignoreIfNotExists if it is true, do nothing if table does not exist. If false, throw a
251+
* TableNotExistException.
252+
*/
253+
CompletableFuture<Void> alterTable(
254+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfNotExists);
255+
237256
/**
238257
* List all partitions in the given table in fluss cluster asynchronously.
239258
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.fluss.rpc.gateway.AdminGateway;
4242
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4343
import org.apache.fluss.rpc.gateway.TabletServerGateway;
44+
import org.apache.fluss.rpc.messages.AlterTableRequest;
4445
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4546
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4647
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -235,6 +236,19 @@ public CompletableFuture<Void> createTable(
235236
return gateway.createTable(request).thenApply(r -> null);
236237
}
237238

239+
@Override
240+
public CompletableFuture<Void> alterTable(
241+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfNotExists) {
242+
tablePath.validate();
243+
AlterTableRequest request = new AlterTableRequest();
244+
request.setTableJson(tableDescriptor.toJsonBytes())
245+
.setIgnoreIfNotExists(ignoreIfNotExists)
246+
.setTablePath()
247+
.setDatabaseName(tablePath.getDatabaseName())
248+
.setTableName(tablePath.getTableName());
249+
return gateway.alterTable(request).thenApply(r -> null);
250+
}
251+
238252
@Override
239253
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
240254
GetTableInfoRequest request = new GetTableInfoRequest();

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public abstract class ClientToServerITCaseBase {
8383
@BeforeEach
8484
protected void setup() throws Exception {
8585
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
86+
clientConf.set(
87+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
88+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
8689
conn = ConnectionFactory.createConnection(clientConf);
8790
admin = conn.getAdmin();
8891
}
@@ -235,6 +238,13 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) {
235238
}
236239
}
237240

241+
public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) {
242+
for (int i = 0; i < expectBucketCount; i++) {
243+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
244+
new TableBucket(tableId, partitionId, i));
245+
}
246+
}
247+
238248
protected static void verifyRows(
239249
RowType rowType,
240250
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.fluss.metadata.TablePath;
6161
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6262
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
63+
import org.apache.fluss.server.zk.data.TableAssignment;
6364
import org.apache.fluss.types.DataTypes;
6465

6566
import org.junit.jupiter.api.BeforeEach;
@@ -197,6 +198,117 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTableBucket() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableAssignment tableAssignment =
210+
FLUSS_CLUSTER_EXTENSION
211+
.getZooKeeperClient()
212+
.getTableAssignment(tableInfo.getTableId())
213+
.get();
214+
System.out.println(tableAssignment);
215+
216+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
217+
218+
TableDescriptor newTableDescriptor =
219+
TableDescriptor.builder()
220+
.schema(existingTableDescriptor.getSchema())
221+
.comment(existingTableDescriptor.getComment().orElse("test table"))
222+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
223+
.distributedBy(
224+
existingTableDescriptor
225+
.getTableDistribution()
226+
.get()
227+
.getBucketCount()
228+
.get()
229+
+ 1,
230+
existingTableDescriptor.getBucketKeys())
231+
.properties(existingTableDescriptor.getProperties())
232+
.customProperties(existingTableDescriptor.getCustomProperties())
233+
.build();
234+
// alter table
235+
admin.alterTable(tablePath, newTableDescriptor, false).get();
236+
237+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
238+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
239+
// assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
240+
241+
TableAssignment tableAssignment1 =
242+
FLUSS_CLUSTER_EXTENSION
243+
.getZooKeeperClient()
244+
.getTableAssignment(tableInfo.getTableId())
245+
.get();
246+
System.out.println(tableAssignment1);
247+
}
248+
249+
@Test
250+
void testAlterTable() throws Exception {
251+
// create table
252+
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
253+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
254+
255+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
256+
257+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
258+
Map<String, String> updateProperties =
259+
new HashMap<>(existingTableDescriptor.getProperties());
260+
Map<String, String> updateCustomProperties =
261+
new HashMap<>(existingTableDescriptor.getCustomProperties());
262+
updateProperties.put("table.datalake.enabled", "true");
263+
updateCustomProperties.put("table.datalake.enabled", "true");
264+
265+
TableDescriptor newTableDescriptor =
266+
TableDescriptor.builder()
267+
.schema(existingTableDescriptor.getSchema())
268+
.comment(existingTableDescriptor.getComment().orElse("test table"))
269+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
270+
.distributedBy(
271+
existingTableDescriptor
272+
.getTableDistribution()
273+
.get()
274+
.getBucketCount()
275+
.orElse(3),
276+
existingTableDescriptor.getBucketKeys())
277+
.properties(updateProperties)
278+
.customProperties(updateCustomProperties)
279+
.build();
280+
// alter table
281+
admin.alterTable(tablePath, newTableDescriptor, false).get();
282+
283+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
284+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
285+
assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
286+
287+
// throw exception if table not exist
288+
assertThatThrownBy(
289+
() ->
290+
admin.alterTable(
291+
TablePath.of("test_db", "alter_table_not_exist"),
292+
newTableDescriptor,
293+
false)
294+
.get())
295+
.cause()
296+
.isInstanceOf(TableNotExistException.class);
297+
298+
// throw exception if database not exist
299+
assertThatThrownBy(
300+
() ->
301+
admin.alterTable(
302+
TablePath.of(
303+
"test_db_not_exist",
304+
"alter_table_not_exist"),
305+
newTableDescriptor,
306+
false)
307+
.get())
308+
.cause()
309+
.isInstanceOf(DatabaseNotExistException.class);
310+
}
311+
200312
@Test
201313
void testCreateInvalidDatabaseAndTable() {
202314
assertThatThrownBy(

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,61 @@ void testPutAndPoll(String kvFormat) throws Exception {
645645
verifyAppendOrPut(false, "ARROW", kvFormat);
646646
}
647647

648+
@Test
649+
void testAlterTableBucket() throws Exception {
650+
TableDescriptor data1TableDescriptor =
651+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
652+
createTable(DATA1_TABLE_PATH, data1TableDescriptor, false);
653+
654+
long tableId;
655+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
656+
tableId = table.getTableInfo().getTableId();
657+
658+
AppendWriter appendWriter = table.newAppend().createWriter();
659+
660+
for (int i = 0; i < 10; i++) {
661+
appendWriter.append(row(i, "a")).get();
662+
}
663+
664+
try (LogScanner logScanner = createLogScanner(table)) {
665+
subscribeFromBeginning(logScanner, table);
666+
667+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
668+
for (ScanRecord record : scanRecords) {
669+
System.out.println(record);
670+
}
671+
}
672+
}
673+
674+
System.out.println("==");
675+
676+
TableDescriptor data2TableDescriptor =
677+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(2).build();
678+
admin.alterTable(DATA1_TABLE_PATH, data2TableDescriptor, false);
679+
680+
waitAllReplicasReady(tableId, 2);
681+
682+
conn = ConnectionFactory.createConnection(clientConf);
683+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
684+
AppendWriter appendWriter = table.newAppend().createWriter();
685+
686+
for (int i = 0; i < 10; i++) {
687+
appendWriter.append(row(i, "a")).get();
688+
}
689+
690+
try (LogScanner logScanner = createLogScanner(table)) {
691+
subscribeFromBeginning(logScanner, table);
692+
693+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
694+
for (TableBucket tableBucket : scanRecords.buckets()) {
695+
for (ScanRecord record : scanRecords.records(tableBucket)) {
696+
System.out.println("Bucket: " + tableBucket + " record: " + record);
697+
}
698+
}
699+
}
700+
}
701+
}
702+
648703
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
649704
throws Exception {
650705
Schema schema =

0 commit comments

Comments
 (0)