Skip to content

Commit 25ef075

Browse files
author
Liebing
committed
[WIP] Bucket rescale
1 parent 310a77d commit 25ef075

File tree

11 files changed

+350
-16
lines changed

11 files changed

+350
-16
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.alibaba.fluss.metadata.TablePath;
6161
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
6262
import com.alibaba.fluss.server.kv.snapshot.KvSnapshotHandle;
63+
import com.alibaba.fluss.server.zk.data.TableAssignment;
6364
import com.alibaba.fluss.types.DataTypes;
6465

6566
import org.junit.jupiter.api.BeforeEach;
@@ -197,6 +198,54 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTableBucket() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableAssignment tableAssignment =
210+
FLUSS_CLUSTER_EXTENSION
211+
.getZooKeeperClient()
212+
.getTableAssignment(tableInfo.getTableId())
213+
.get();
214+
System.out.println(tableAssignment);
215+
216+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
217+
218+
TableDescriptor newTableDescriptor =
219+
TableDescriptor.builder()
220+
.schema(existingTableDescriptor.getSchema())
221+
.comment(existingTableDescriptor.getComment().orElse("test table"))
222+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
223+
.distributedBy(
224+
existingTableDescriptor
225+
.getTableDistribution()
226+
.get()
227+
.getBucketCount()
228+
.get()
229+
+ 1,
230+
existingTableDescriptor.getBucketKeys())
231+
.properties(existingTableDescriptor.getProperties())
232+
.customProperties(existingTableDescriptor.getCustomProperties())
233+
.build();
234+
// alter table
235+
admin.alterTable(tablePath, newTableDescriptor, false).get();
236+
237+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
238+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
239+
// assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
240+
241+
TableAssignment tableAssignment1 =
242+
FLUSS_CLUSTER_EXTENSION
243+
.getZooKeeperClient()
244+
.getTableAssignment(tableInfo.getTableId())
245+
.get();
246+
System.out.println(tableAssignment1);
247+
}
248+
200249
@Test
201250
void testAlterTable() throws Exception {
202251
// create table

fluss-client/src/test/resources/log4j2-test.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
# Set root logger level to OFF to not flood build logs
2020
# set manually to INFO for debugging purposes
21-
rootLogger.level = OFF
21+
rootLogger.level = DEBUG
2222
rootLogger.appenderRef.test.ref = TestLogger
2323

2424
appender.testlogger.name = TestLogger

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.alibaba.fluss.rpc.protocol.ApiError;
4343
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
4444
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
45+
import com.alibaba.fluss.server.coordinator.event.AlterTableBucketEvent;
4546
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4647
import com.alibaba.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
4748
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
@@ -57,6 +58,7 @@
5758
import com.alibaba.fluss.server.coordinator.event.FencedCoordinatorEvent;
5859
import com.alibaba.fluss.server.coordinator.event.NewTabletServerEvent;
5960
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
61+
import com.alibaba.fluss.server.coordinator.event.watcher.TableBucketChangeWatcher;
6062
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6163
import com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
6264
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
@@ -128,6 +130,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
128130
private final TableChangeWatcher tableChangeWatcher;
129131
private final CoordinatorChannelManager coordinatorChannelManager;
130132
private final TabletServerChangeWatcher tabletServerChangeWatcher;
133+
private final TableBucketChangeWatcher tableBucketChangeWatcher;
131134
private final CoordinatorMetadataCache serverMetadataCache;
132135
private final CoordinatorRequestBatch coordinatorRequestBatch;
133136
private final CoordinatorMetricGroup coordinatorMetricGroup;
@@ -178,6 +181,8 @@ public CoordinatorEventProcessor(
178181
this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager);
179182
this.tabletServerChangeWatcher =
180183
new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager);
184+
this.tableBucketChangeWatcher =
185+
new TableBucketChangeWatcher(zooKeeperClient, coordinatorEventManager);
181186
this.coordinatorRequestBatch =
182187
new CoordinatorRequestBatch(
183188
coordinatorChannelManager, coordinatorEventManager, coordinatorContext);
@@ -201,6 +206,7 @@ public void startup() {
201206
// start watchers first so that we won't miss node in zk;
202207
tabletServerChangeWatcher.start();
203208
tableChangeWatcher.start();
209+
tableBucketChangeWatcher.start();
204210
LOG.info("Initializing coordinator context.");
205211
try {
206212
initCoordinatorContext();
@@ -422,6 +428,7 @@ private void onShutdown() {
422428
// then stop watchers
423429
tableChangeWatcher.stop();
424430
tabletServerChangeWatcher.stop();
431+
tableBucketChangeWatcher.stop();
425432
}
426433

427434
@Override
@@ -470,6 +477,8 @@ public void process(CoordinatorEvent event) {
470477
completeFromCallable(
471478
commitLakeTableSnapshotEvent.getRespCallback(),
472479
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
480+
} else if (event instanceof AlterTableBucketEvent) {
481+
processAlterTableBucket((AlterTableBucketEvent) event);
473482
} else if (event instanceof AccessContextEvent) {
474483
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
475484
processAccessContext(accessContextEvent);
@@ -478,6 +487,43 @@ public void process(CoordinatorEvent event) {
478487
}
479488
}
480489

490+
private void processAlterTableBucket(AlterTableBucketEvent alterTableBucketEvent) {
491+
Map<TableBucket, BucketAssignment> bucketsToBeAdded =
492+
alterTableBucketEvent.getTableAssignment().getBucketAssignments().entrySet()
493+
.stream()
494+
.filter(
495+
entry ->
496+
coordinatorContext
497+
.getTableAssignment(
498+
alterTableBucketEvent.getTableId())
499+
.containsKey(entry.getKey()))
500+
.collect(
501+
Collectors.toMap(
502+
entry ->
503+
new TableBucket(
504+
alterTableBucketEvent.getTableId(),
505+
entry.getKey()),
506+
Map.Entry::getValue));
507+
508+
if (coordinatorContext.isTableQueuedForDeletion(alterTableBucketEvent.getTableId())) {
509+
if (!bucketsToBeAdded.isEmpty()) {
510+
511+
} else {
512+
LOG.info(
513+
"Ignoring bucket change during table deletion as no new buckets are added");
514+
}
515+
} else if (!bucketsToBeAdded.isEmpty()) {
516+
LOG.info("New buckets to be added {}", bucketsToBeAdded);
517+
bucketsToBeAdded.forEach(
518+
(tableBucket, bucketAssignment) -> {
519+
coordinatorContext.updateBucketReplicaAssignment(
520+
tableBucket, bucketAssignment.getReplicas());
521+
});
522+
tableManager.onCreateNewTableBucket(
523+
alterTableBucketEvent.getTableId(), bucketsToBeAdded.keySet());
524+
}
525+
}
526+
481527
private void processCreateTable(CreateTableEvent createTableEvent) {
482528
long tableId = createTableEvent.getTableInfo().getTableId();
483529
// skip the table if it already exists

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,39 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
308308
throw new InvalidTableException(e.getMessage());
309309
}
310310
}
311+
312+
TableRegistration table = metadataManager.getTableRegistration(tablePath);
313+
// TODO: Alter partitioned table
314+
315+
// Alter table bucket
316+
TableAssignment existingAssignment = metadataManager.getTableAssignment(table.tableId);
317+
int oldNumBuckets = existingAssignment.getBuckets().size();
318+
int newNumBuckets = tableDescriptor.getTableDistribution().get().getBucketCount().get();
319+
int numBucketsIncrement = newNumBuckets - oldNumBuckets;
320+
// if (numBucketsIncrement < 0) {
321+
// throw new InvalidBucketsException("Table currently has " + oldNumBuckets + "
322+
// buckets, which is higher than the requested " + newNumBuckets);
323+
// } else if (numBucketsIncrement == 0) {
324+
// throw new InvalidBucketsException("Table already has " + oldNumBuckets + "
325+
// buckets");
326+
// }
327+
328+
if (numBucketsIncrement > 0) {
329+
int replicaFactor = table.getTableConfig().getReplicationFactor();
330+
TabletServerInfo[] servers = metadataCache.getLiveServers();
331+
BucketAssignment existingAssignmentBucket0 = existingAssignment.getBucketAssignment(0);
332+
int startIndex = Math.max(0, existingAssignmentBucket0.getReplicas().get(0));
333+
// TODO: We should prevent adding buckets while table reassignment is in progress.
334+
Map<Integer, BucketAssignment> newBucketsAssignment =
335+
generateAssignment(numBucketsIncrement, replicaFactor, servers, oldNumBuckets)
336+
.getBucketAssignments();
337+
metadataManager.alterTableBucket(
338+
table.tableId, existingAssignment, new TableAssignment(newBucketsAssignment));
339+
}
340+
341+
// Alter table registration
311342
metadataManager.alterTable(tablePath, tableDescriptor, request.isIgnoreIfNotExists());
343+
312344
return CompletableFuture.completedFuture(alterTableResponse);
313345
}
314346

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,10 @@ public void alterTable(
320320
}
321321
}
322322

323+
TableRegistration existTableReg = getTableRegistration(tablePath);
323324
try {
324325
TableRegistration updatedTableRegistration =
325-
getUpdatedTableRegistration(tablePath, tableDescriptor);
326+
getUpdatedTableRegistration(existTableReg, tableDescriptor);
326327
zookeeperClient.updateTable(tablePath, updatedTableRegistration);
327328
} catch (Exception e) {
328329
if (e instanceof KeeperException.NoNodeException) {
@@ -336,12 +337,20 @@ public void alterTable(
336337
}
337338
}
338339

340+
public void alterTableBucket(
341+
long tableId, TableAssignment existingAssignment, TableAssignment newAssignment) {
342+
try {
343+
existingAssignment.getBucketAssignments().putAll(newAssignment.getBucketAssignments());
344+
zookeeperClient.updateTableAssignment(tableId, existingAssignment);
345+
} catch (Exception e) {
346+
throw new FlussRuntimeException("Failed to update table assignment: " + tableId, e);
347+
}
348+
}
349+
339350
private TableRegistration getUpdatedTableRegistration(
340-
TablePath tablePath, TableDescriptor updateTableDescriptor) {
341-
TableRegistration existTableReg = getTableRegistration(tablePath);
351+
TableRegistration existTableReg, TableDescriptor updateTableDescriptor) {
342352
Map<String, String> updateProperties = updateTableDescriptor.getProperties();
343353
Map<String, String> updateCustomProperties = updateTableDescriptor.getCustomProperties();
344-
validateAlterTableProperties(updateTableDescriptor);
345354

346355
Map<String, String> newProperties = new HashMap<>(existTableReg.properties);
347356
for (Map.Entry<String, String> updateProperty : updateProperties.entrySet()) {
@@ -391,6 +400,19 @@ public TableRegistration getTableRegistration(TablePath tablePath) {
391400
return optionalTable.get();
392401
}
393402

403+
public TableAssignment getTableAssignment(long tableId) {
404+
Optional<TableAssignment> optionalTableAssignment;
405+
try {
406+
optionalTableAssignment = zookeeperClient.getTableAssignment(tableId);
407+
} catch (Exception e) {
408+
throw new RuntimeException(e);
409+
}
410+
if (optionalTableAssignment.isEmpty()) {
411+
throw new TableNotExistException("Table '" + tableId + "' does not exist.");
412+
}
413+
return optionalTableAssignment.get();
414+
}
415+
394416
public SchemaInfo getLatestSchema(TablePath tablePath) throws SchemaNotExistException {
395417
final int currentSchemaId;
396418
try {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/TableManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void onCreateNewPartition(
142142
onCreateNewTableBucket(tableId, newTableBuckets);
143143
}
144144

145-
private void onCreateNewTableBucket(long tableId, Set<TableBucket> tableBuckets) {
145+
public void onCreateNewTableBucket(long tableId, Set<TableBucket> tableBuckets) {
146146
LOG.info(
147147
"New table buckets: {} for table {}.",
148148
tableBuckets,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.alibaba.fluss.server.coordinator.event;
2+
3+
import com.alibaba.fluss.server.zk.data.TableAssignment;
4+
5+
import java.util.Objects;
6+
7+
/** An event for alter table bucket. */
8+
public class AlterTableBucketEvent implements CoordinatorEvent {
9+
10+
private final long tableId;
11+
private final TableAssignment tableAssignment;
12+
13+
public AlterTableBucketEvent(long tableId, TableAssignment tableAssignment) {
14+
this.tableId = tableId;
15+
this.tableAssignment = tableAssignment;
16+
}
17+
18+
public long getTableId() {
19+
return tableId;
20+
}
21+
22+
public TableAssignment getTableAssignment() {
23+
return tableAssignment;
24+
}
25+
26+
@Override
27+
public boolean equals(Object o) {
28+
if (o == null || getClass() != o.getClass()) {
29+
return false;
30+
}
31+
AlterTableBucketEvent that = (AlterTableBucketEvent) o;
32+
return Objects.equals(tableAssignment, that.tableAssignment);
33+
}
34+
35+
@Override
36+
public int hashCode() {
37+
return Objects.hashCode(tableAssignment);
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return "AlterTableBucketEvent{" + "tableAssignment=" + tableAssignment + '}';
43+
}
44+
}

0 commit comments

Comments
 (0)