Skip to content

Commit 3f60a4e

Browse files
committed
[WIP] Bucket rescale
1 parent 88ee6d8 commit 3f60a4e

File tree

8 files changed

+130
-70
lines changed

8 files changed

+130
-70
lines changed

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.alibaba.fluss.rpc.gateway;
1919

20+
import com.alibaba.fluss.rpc.messages.AlterTableBucketRequest;
21+
import com.alibaba.fluss.rpc.messages.AlterTableBucketResponse;
2022
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
2123
import com.alibaba.fluss.rpc.messages.CreateAclsResponse;
2224
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -72,6 +74,9 @@ public interface AdminGateway extends AdminReadOnlyGateway {
7274
@RPC(api = ApiKeys.DROP_TABLE)
7375
CompletableFuture<DropTableResponse> dropTable(DropTableRequest request);
7476

77+
@RPC(api = ApiKeys.ALTER_TABLE_BUCKET)
78+
CompletableFuture<AlterTableBucketResponse> alterTableBucket(AlterTableBucketRequest request);
79+
7580
/**
7681
* create a new partition for a partitioned table.
7782
*

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public enum ApiKeys {
7070
CREATE_ACLS(1039, 0, 0, PUBLIC),
7171
LIST_ACLS(1040, 0, 0, PUBLIC),
7272
DROP_ACLS(1041, 0, 0, PUBLIC),
73-
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
73+
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
74+
ALTER_TABLE_BUCKET(1043, 0, 0, PUBLIC);
7475

7576
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
7677
Arrays.stream(ApiKeys.values())

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,16 @@ message CreateTableRequest {
108108
message CreateTableResponse {
109109
}
110110

111+
// alter table bucket request and response
112+
message AlterTableBucketRequest {
113+
required PbTablePath table_path = 1;
114+
required int32 bucket_num = 2;
115+
required bool ignore_if_not_exists = 3;
116+
}
117+
118+
message AlterTableBucketResponse {
119+
}
120+
111121
// get table request and response
112122
message GetTableInfoRequest {
113123
required PbTablePath table_path = 1;

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,7 @@
2121
import com.alibaba.fluss.cluster.TabletServerInfo;
2222
import com.alibaba.fluss.config.ConfigOptions;
2323
import com.alibaba.fluss.config.Configuration;
24-
import com.alibaba.fluss.exception.InvalidCoordinatorException;
25-
import com.alibaba.fluss.exception.InvalidDatabaseException;
26-
import com.alibaba.fluss.exception.InvalidTableException;
27-
import com.alibaba.fluss.exception.SecurityDisabledException;
28-
import com.alibaba.fluss.exception.TableAlreadyExistException;
29-
import com.alibaba.fluss.exception.TableNotPartitionedException;
24+
import com.alibaba.fluss.exception.*;
3025
import com.alibaba.fluss.fs.FileSystem;
3126
import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
3227
import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -42,6 +37,8 @@
4237
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
4338
import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
4439
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
40+
import com.alibaba.fluss.rpc.messages.AlterTableBucketRequest;
41+
import com.alibaba.fluss.rpc.messages.AlterTableBucketResponse;
4542
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
4643
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
4744
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -348,6 +345,48 @@ public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request)
348345
return CompletableFuture.completedFuture(response);
349346
}
350347

348+
@Override
349+
public CompletableFuture<AlterTableBucketResponse> alterTableBucket(AlterTableBucketRequest request) {
350+
TablePath tablePath = toTablePath(request.getTablePath());
351+
if (authorizer != null) {
352+
authorizer.authorize(
353+
currentSession(),
354+
OperationType.ALTER,
355+
Resource.table(tablePath.getDatabaseName(), tablePath.getTableName()));
356+
}
357+
358+
AlterTableBucketResponse response = new AlterTableBucketResponse();
359+
TableRegistration table = metadataManager.getTableRegistration(tablePath);
360+
if (table.isPartitioned()) {
361+
return CompletableFuture.completedFuture(response);
362+
}
363+
364+
TableAssignment existingAssignment = metadataManager.getTableAssignment(table.tableId);
365+
int oldNumBuckets = existingAssignment.getBuckets().size();
366+
int newNumBuckets = request.getBucketNum();
367+
int numBucketsIncrement = newNumBuckets - oldNumBuckets;
368+
if (numBucketsIncrement < 0) {
369+
throw new InvalidBucketsException("Table currently has " + oldNumBuckets + " buckets, which is higher than the requested " + newNumBuckets);
370+
} else if (numBucketsIncrement == 0) {
371+
throw new InvalidBucketsException("Table already has " + oldNumBuckets + " buckets");
372+
}
373+
374+
int replicaFactor = table.getTableConfig().getReplicationFactor();
375+
TabletServerInfo[] servers = metadataCache.getLiveServers();
376+
BucketAssignment existingAssignmentBucket0 = existingAssignment.getBucketAssignment(0);
377+
int startIndex = Math.max(0, existingAssignmentBucket0.getReplicas().get(0));
378+
// TODO: We should prevent adding buckets while table reassignment is in progress.
379+
Map<Integer, BucketAssignment> newBucketsAssignment =
380+
generateAssignment(
381+
numBucketsIncrement,
382+
replicaFactor,
383+
servers,
384+
oldNumBuckets)
385+
.getBucketAssignments();
386+
387+
return CompletableFuture.completedFuture(response);
388+
}
389+
351390
@Override
352391
public CompletableFuture<CreatePartitionResponse> createPartition(
353392
CreatePartitionRequest request) {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,19 @@ public TableRegistration getTableRegistration(TablePath tablePath) {
329329
return optionalTable.get();
330330
}
331331

332+
public TableAssignment getTableAssignment(long tableId) {
333+
Optional<TableAssignment> optionalTableAssignment;
334+
try {
335+
optionalTableAssignment = zookeeperClient.getTableAssignment(tableId);
336+
} catch (Exception e) {
337+
throw new RuntimeException(e);
338+
}
339+
if (!optionalTableAssignment.isPresent()) {
340+
throw new TableNotExistException("Table '" + tableId + "' does not exist.");
341+
}
342+
return optionalTableAssignment.get();
343+
}
344+
332345
public SchemaInfo getLatestSchema(TablePath tablePath) throws SchemaNotExistException {
333346
final int currentSchemaId;
334347
try {
@@ -368,6 +381,12 @@ public boolean tableExists(TablePath tablePath) {
368381
String.format("Fail to check the table %s exist or not.", tablePath));
369382
}
370383

384+
public void alterTableBucket(
385+
TableAssignment existingBucketsAssignment,
386+
TableAssignment newBucketsAssignment) {
387+
388+
}
389+
371390
public long initWriterId() {
372391
return uncheck(
373392
zookeeperClient::getWriterIdAndIncrement, "Fail to get writer id from zookeeper");

fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableAssignmentUtils.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ protected static TableAssignment generateAssignment(
4646
int replicationFactor,
4747
TabletServerInfo[] servers,
4848
int startIndex,
49-
int nextReplicaShift) {
49+
int nextReplicaShift,
50+
int startBucketId) {
5051
if (nBuckets < 0) {
5152
throw new InvalidBucketsException("Number of buckets must be larger than 0.");
5253
}
@@ -68,7 +69,8 @@ protected static TableAssignment generateAssignment(
6869
replicationFactor,
6970
Arrays.stream(servers).mapToInt(TabletServerInfo::getId).toArray(),
7071
startIndex,
71-
nextReplicaShift);
72+
nextReplicaShift,
73+
startBucketId);
7274
} else {
7375
if (Arrays.stream(servers).anyMatch(tsInfo -> tsInfo.getRack() == null)) {
7476
throw new InvalidServerRackInfoException(
@@ -169,17 +171,31 @@ public static TableAssignment generateAssignment(
169171
replicationFactor,
170172
servers,
171173
randomInt(servers.length),
172-
randomInt(servers.length));
174+
randomInt(servers.length),
175+
0);
176+
}
177+
178+
public static TableAssignment generateAssignment(
179+
int nBuckets, int replicationFactor, TabletServerInfo[] servers, int startBucketId)
180+
throws InvalidReplicationFactorException {
181+
return generateAssignment(
182+
nBuckets,
183+
replicationFactor,
184+
servers,
185+
randomInt(servers.length),
186+
randomInt(servers.length),
187+
startBucketId);
173188
}
174189

175190
private static TableAssignment generateRackUnawareAssigment(
176191
int nBuckets,
177192
int replicationFactor,
178193
int[] serverIds,
179194
int startIndex,
180-
int nextReplicaShift) {
195+
int nextReplicaShift,
196+
int startBucketId) {
181197
Map<Integer, BucketAssignment> assignments = new HashMap<>();
182-
int currentBucketId = 0;
198+
int currentBucketId = Math.max(0, startBucketId);
183199
for (int i = 0; i < nBuckets; i++) {
184200
if (currentBucketId > 0 && (currentBucketId % serverIds.length == 0)) {
185201
nextReplicaShift += 1;

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java

Lines changed: 6 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,62 +20,7 @@
2020
import com.alibaba.fluss.exception.FencedLeaderEpochException;
2121
import com.alibaba.fluss.metadata.TableBucket;
2222
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
23-
import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
24-
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
25-
import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
26-
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
27-
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
28-
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
29-
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
30-
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
31-
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestRequest;
32-
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
33-
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
34-
import com.alibaba.fluss.rpc.messages.CreateAclsResponse;
35-
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
36-
import com.alibaba.fluss.rpc.messages.CreateDatabaseResponse;
37-
import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
38-
import com.alibaba.fluss.rpc.messages.CreatePartitionResponse;
39-
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
40-
import com.alibaba.fluss.rpc.messages.CreateTableResponse;
41-
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
42-
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
43-
import com.alibaba.fluss.rpc.messages.DropAclsRequest;
44-
import com.alibaba.fluss.rpc.messages.DropAclsResponse;
45-
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
46-
import com.alibaba.fluss.rpc.messages.DropDatabaseResponse;
47-
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
48-
import com.alibaba.fluss.rpc.messages.DropPartitionResponse;
49-
import com.alibaba.fluss.rpc.messages.DropTableRequest;
50-
import com.alibaba.fluss.rpc.messages.DropTableResponse;
51-
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
52-
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
53-
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
54-
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
55-
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
56-
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
57-
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
58-
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
59-
import com.alibaba.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
60-
import com.alibaba.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
61-
import com.alibaba.fluss.rpc.messages.GetTableInfoRequest;
62-
import com.alibaba.fluss.rpc.messages.GetTableInfoResponse;
63-
import com.alibaba.fluss.rpc.messages.GetTableSchemaRequest;
64-
import com.alibaba.fluss.rpc.messages.GetTableSchemaResponse;
65-
import com.alibaba.fluss.rpc.messages.LakeTieringHeartbeatRequest;
66-
import com.alibaba.fluss.rpc.messages.LakeTieringHeartbeatResponse;
67-
import com.alibaba.fluss.rpc.messages.ListAclsRequest;
68-
import com.alibaba.fluss.rpc.messages.ListAclsResponse;
69-
import com.alibaba.fluss.rpc.messages.ListDatabasesRequest;
70-
import com.alibaba.fluss.rpc.messages.ListDatabasesResponse;
71-
import com.alibaba.fluss.rpc.messages.ListPartitionInfosRequest;
72-
import com.alibaba.fluss.rpc.messages.ListPartitionInfosResponse;
73-
import com.alibaba.fluss.rpc.messages.ListTablesRequest;
74-
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
75-
import com.alibaba.fluss.rpc.messages.MetadataRequest;
76-
import com.alibaba.fluss.rpc.messages.MetadataResponse;
77-
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
78-
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
23+
import com.alibaba.fluss.rpc.messages.*;
7924
import com.alibaba.fluss.rpc.protocol.ApiError;
8025
import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
8126
import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
@@ -137,6 +82,11 @@ public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request)
13782
throw new UnsupportedOperationException();
13883
}
13984

85+
@Override
86+
public CompletableFuture<AlterTableBucketResponse> alterTableBucket(AlterTableBucketRequest request) {
87+
throw new UnsupportedOperationException();
88+
}
89+
14090
@Override
14191
public CompletableFuture<CreatePartitionResponse> createPartition(
14292
CreatePartitionRequest request) {

fluss-server/src/test/java/com/alibaba/fluss/server/utils/TableAssignmentUtilsTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@
4545
/** Test for {@link TableAssignmentUtils} for rack unaware mode and rack aware mode. */
4646
class TableAssignmentUtilsTest {
4747

48+
@Test
49+
void test() {
50+
TableAssignment tableAssignment = generateAssignment(2, 3, new TabletServerInfo[]{
51+
new TabletServerInfo(0, null),
52+
new TabletServerInfo(1, null),
53+
new TabletServerInfo(2, null),
54+
new TabletServerInfo(3, null),
55+
new TabletServerInfo(4, null)
56+
},
57+
0, 3, 9);
58+
System.out.println(tableAssignment);
59+
}
60+
4861
@Test
4962
void testTableAssignmentWithRackUnAware() {
5063
// should throw exception since servers is empty
@@ -94,6 +107,7 @@ void testTableAssignmentWithRackUnAware() {
94107
1,
95108
toTabletServerInfo(Collections.emptyMap(), Arrays.asList(0, 1, 2, 3)),
96109
0,
110+
0,
97111
0);
98112
TableAssignment expectedAssignment =
99113
TableAssignment.builder()
@@ -110,6 +124,7 @@ void testTableAssignmentWithRackUnAware() {
110124
3,
111125
toTabletServerInfo(Collections.emptyMap(), Arrays.asList(0, 1, 2, 3)),
112126
1,
127+
0,
113128
0);
114129
expectedAssignment =
115130
TableAssignment.builder()
@@ -126,6 +141,7 @@ void testTableAssignmentWithRackUnAware() {
126141
3,
127142
toTabletServerInfo(Collections.emptyMap(), Arrays.asList(0, 1, 2, 3, 4)),
128143
0,
144+
0,
129145
0);
130146
expectedAssignment =
131147
TableAssignment.builder()
@@ -164,7 +180,7 @@ void testGetRackAlternatedTabletServerListAndAssignReplicasToServers() {
164180

165181
TableAssignment tableAssignment =
166182
generateAssignment(
167-
7, 3, toTabletServerInfo(rackMap, Collections.emptyList()), 0, 0);
183+
7, 3, toTabletServerInfo(rackMap, Collections.emptyList()), 0, 0, 0);
168184
TableAssignment expectedAssignment =
169185
TableAssignment.builder()
170186
.add(0, BucketAssignment.of(0, 3, 1))
@@ -196,6 +212,7 @@ void testTableAssignmentWithRackAware() {
196212
replicationFactor,
197213
toTabletServerInfo(rackMap, Collections.emptyList()),
198214
2,
215+
0,
199216
0);
200217
checkTableAssignment(tableAssignment, rackMap, 6, nBuckets, replicationFactor);
201218
}
@@ -238,6 +255,7 @@ void testAssignmentWithRackAwareWithUnevenReplicas() {
238255
replicationFactor,
239256
toTabletServerInfo(rackMap, Collections.emptyList()),
240257
0,
258+
0,
241259
0);
242260
checkTableAssignment(
243261
tableAssignment, rackMap, 6, nBuckets, replicationFactor, true, false, false);
@@ -302,7 +320,8 @@ void testRackAwareExpansion() {
302320
replicationFactor,
303321
toTabletServerInfo(rackMap, Collections.emptyList()),
304322
0,
305-
12);
323+
12,
324+
0);
306325
checkTableAssignment(tableAssignment, rackMap, 6, nBuckets, replicationFactor);
307326
}
308327

@@ -462,6 +481,7 @@ void testSkipTabletServerWithReplicaAlreadyAssigned() {
462481
replicationFactor,
463482
toTabletServerInfo(rackMap, Collections.emptyList()),
464483
2,
484+
0,
465485
0);
466486
checkTableAssignment(
467487
tableAssignment, rackMap, 12, nBuckets, replicationFactor, false, false, false);

0 commit comments

Comments
 (0)