Skip to content

Commit 8412090

Browse files
committed
[WIP] Bucket Rescale
1 parent 310a77d commit 8412090

File tree

12 files changed

+504
-20
lines changed

12 files changed

+504
-20
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public abstract class ClientToServerITCaseBase {
8383
@BeforeEach
8484
protected void setup() throws Exception {
8585
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
86+
clientConf.set(
87+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
88+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
8689
conn = ConnectionFactory.createConnection(clientConf);
8790
admin = conn.getAdmin();
8891
}

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.alibaba.fluss.metadata.TablePath;
6161
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
6262
import com.alibaba.fluss.server.kv.snapshot.KvSnapshotHandle;
63+
import com.alibaba.fluss.server.zk.data.TableAssignment;
6364
import com.alibaba.fluss.types.DataTypes;
6465

6566
import org.junit.jupiter.api.BeforeEach;
@@ -197,6 +198,54 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTableBucket() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableAssignment tableAssignment =
210+
FLUSS_CLUSTER_EXTENSION
211+
.getZooKeeperClient()
212+
.getTableAssignment(tableInfo.getTableId())
213+
.get();
214+
System.out.println(tableAssignment);
215+
216+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
217+
218+
TableDescriptor newTableDescriptor =
219+
TableDescriptor.builder()
220+
.schema(existingTableDescriptor.getSchema())
221+
.comment(existingTableDescriptor.getComment().orElse("test table"))
222+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
223+
.distributedBy(
224+
existingTableDescriptor
225+
.getTableDistribution()
226+
.get()
227+
.getBucketCount()
228+
.get()
229+
+ 1,
230+
existingTableDescriptor.getBucketKeys())
231+
.properties(existingTableDescriptor.getProperties())
232+
.customProperties(existingTableDescriptor.getCustomProperties())
233+
.build();
234+
// alter table
235+
admin.alterTable(tablePath, newTableDescriptor, false).get();
236+
237+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
238+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
239+
// assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
240+
241+
TableAssignment tableAssignment1 =
242+
FLUSS_CLUSTER_EXTENSION
243+
.getZooKeeperClient()
244+
.getTableAssignment(tableInfo.getTableId())
245+
.get();
246+
System.out.println(tableAssignment1);
247+
}
248+
200249
@Test
201250
void testAlterTable() throws Exception {
202251
// create table

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,59 @@ void testPutAndPoll(String kvFormat) throws Exception {
645645
verifyAppendOrPut(false, "ARROW", kvFormat);
646646
}
647647

648+
@Test
649+
void test() throws Exception {
650+
TableDescriptor DATA1_TABLE_DESCRIPTOR =
651+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
652+
createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
653+
654+
long tableId;
655+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
656+
tableId = table.getTableInfo().getTableId();
657+
658+
AppendWriter appendWriter = table.newAppend().createWriter();
659+
660+
for (int i = 0; i < 10; i++) {
661+
appendWriter.append(row(i, "a")).get();
662+
}
663+
664+
try (LogScanner logScanner = createLogScanner(table)) {
665+
subscribeFromBeginning(logScanner, table);
666+
667+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
668+
for (ScanRecord record : scanRecords) {
669+
System.out.println(record);
670+
}
671+
}
672+
}
673+
674+
System.out.println("==");
675+
676+
TableDescriptor DATA2_TABLE_DESCRIPTOR =
677+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(2).build();
678+
admin.alterTable(DATA1_TABLE_PATH, DATA2_TABLE_DESCRIPTOR, false);
679+
680+
waitAllReplicasReady(tableId, 2);
681+
682+
conn = ConnectionFactory.createConnection(clientConf);
683+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
684+
AppendWriter appendWriter = table.newAppend().createWriter();
685+
686+
for (int i = 0; i < 10; i++) {
687+
appendWriter.append(row(i, "a")).get();
688+
}
689+
690+
try (LogScanner logScanner = createLogScanner(table)) {
691+
subscribeFromBeginning(logScanner, table);
692+
693+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
694+
for (ScanRecord record : scanRecords) {
695+
System.out.println(record);
696+
}
697+
}
698+
}
699+
}
700+
648701
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
649702
throws Exception {
650703
Schema schema =

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.alibaba.fluss.rpc.protocol.ApiError;
4343
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
4444
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
45+
import com.alibaba.fluss.server.coordinator.event.AlterTableBucketEvent;
4546
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4647
import com.alibaba.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
4748
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
@@ -57,6 +58,7 @@
5758
import com.alibaba.fluss.server.coordinator.event.FencedCoordinatorEvent;
5859
import com.alibaba.fluss.server.coordinator.event.NewTabletServerEvent;
5960
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
61+
import com.alibaba.fluss.server.coordinator.event.watcher.TableBucketChangeWatcher;
6062
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6163
import com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
6264
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
@@ -128,6 +130,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
128130
private final TableChangeWatcher tableChangeWatcher;
129131
private final CoordinatorChannelManager coordinatorChannelManager;
130132
private final TabletServerChangeWatcher tabletServerChangeWatcher;
133+
private final TableBucketChangeWatcher tableBucketChangeWatcher;
131134
private final CoordinatorMetadataCache serverMetadataCache;
132135
private final CoordinatorRequestBatch coordinatorRequestBatch;
133136
private final CoordinatorMetricGroup coordinatorMetricGroup;
@@ -178,6 +181,8 @@ public CoordinatorEventProcessor(
178181
this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager);
179182
this.tabletServerChangeWatcher =
180183
new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager);
184+
this.tableBucketChangeWatcher =
185+
new TableBucketChangeWatcher(zooKeeperClient, coordinatorEventManager);
181186
this.coordinatorRequestBatch =
182187
new CoordinatorRequestBatch(
183188
coordinatorChannelManager, coordinatorEventManager, coordinatorContext);
@@ -201,6 +206,7 @@ public void startup() {
201206
// start watchers first so that we won't miss node in zk;
202207
tabletServerChangeWatcher.start();
203208
tableChangeWatcher.start();
209+
tableBucketChangeWatcher.start();
204210
LOG.info("Initializing coordinator context.");
205211
try {
206212
initCoordinatorContext();
@@ -422,6 +428,7 @@ private void onShutdown() {
422428
// then stop watchers
423429
tableChangeWatcher.stop();
424430
tabletServerChangeWatcher.stop();
431+
tableBucketChangeWatcher.stop();
425432
}
426433

427434
@Override
@@ -434,6 +441,8 @@ public void process(CoordinatorEvent event) {
434441
processDropTable((DropTableEvent) event);
435442
} else if (event instanceof DropPartitionEvent) {
436443
processDropPartition((DropPartitionEvent) event);
444+
} else if (event instanceof AlterTableBucketEvent) {
445+
processAlterTableBucket((AlterTableBucketEvent) event);
437446
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
438447
processNotifyLeaderAndIsrResponseReceivedEvent(
439448
(NotifyLeaderAndIsrResponseReceivedEvent) event);
@@ -602,6 +611,65 @@ private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
602611
Collections.emptySet());
603612
}
604613

614+
private void processAlterTableBucket(AlterTableBucketEvent alterTableBucketEvent) {
615+
Map<TableBucket, BucketAssignment> bucketsToBeAdded =
616+
alterTableBucketEvent.getTableAssignment().getBucketAssignments().entrySet()
617+
.stream()
618+
.filter(
619+
entry ->
620+
!coordinatorContext
621+
.getTableAssignment(
622+
alterTableBucketEvent.getTableId())
623+
.containsKey(entry.getKey()))
624+
.collect(
625+
Collectors.toMap(
626+
entry ->
627+
new TableBucket(
628+
alterTableBucketEvent.getTableId(),
629+
entry.getKey()),
630+
Map.Entry::getValue));
631+
632+
// TODO: update tableInfo here?
633+
634+
if (coordinatorContext.isTableQueuedForDeletion(alterTableBucketEvent.getTableId())) {
635+
if (!bucketsToBeAdded.isEmpty()) {
636+
LOG.warn(
637+
"Skipping adding buckets {} for table {} since it is currently being deleted.",
638+
bucketsToBeAdded,
639+
alterTableBucketEvent.getTableId());
640+
// TODO: restore table assignment
641+
} else {
642+
LOG.info(
643+
"Ignoring bucket change during table deletion as no new buckets are added");
644+
}
645+
} else if (!bucketsToBeAdded.isEmpty()) {
646+
LOG.info("New buckets to be added {}", bucketsToBeAdded);
647+
bucketsToBeAdded.forEach(
648+
(tableBucket, bucketAssignment) -> {
649+
coordinatorContext.updateBucketReplicaAssignment(
650+
tableBucket, bucketAssignment.getReplicas());
651+
});
652+
tableManager.onCreateNewTableBucket(
653+
alterTableBucketEvent.getTableId(), bucketsToBeAdded.keySet());
654+
655+
Set<TableBucket> tableBuckets = new HashSet<>();
656+
alterTableBucketEvent
657+
.getTableAssignment()
658+
.getBucketAssignments()
659+
.keySet()
660+
.forEach(
661+
bucketId ->
662+
tableBuckets.add(
663+
new TableBucket(
664+
alterTableBucketEvent.getTableId(), bucketId)));
665+
updateTabletServerMetadataCache(
666+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
667+
null,
668+
null,
669+
tableBuckets);
670+
}
671+
}
672+
605673
private void processDeleteReplicaResponseReceived(
606674
DeleteReplicaResponseReceivedEvent deleteReplicaResponseReceivedEvent) {
607675
List<DeleteReplicaResultForBucket> deleteReplicaResultForBuckets =

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.fluss.cluster.TabletServerInfo;
2222
import com.alibaba.fluss.config.ConfigOptions;
2323
import com.alibaba.fluss.config.Configuration;
24+
import com.alibaba.fluss.exception.InvalidBucketsException;
2425
import com.alibaba.fluss.exception.InvalidCoordinatorException;
2526
import com.alibaba.fluss.exception.InvalidDatabaseException;
2627
import com.alibaba.fluss.exception.InvalidTableException;
@@ -293,10 +294,7 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
293294
authorizer.authorize(currentSession(), OperationType.ALTER, Resource.table(tablePath));
294295
}
295296

296-
AlterTableResponse alterTableResponse = new AlterTableResponse();
297-
298297
TableDescriptor tableDescriptor;
299-
300298
try {
301299
tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson());
302300
} catch (Exception e) {
@@ -308,8 +306,65 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
308306
throw new InvalidTableException(e.getMessage());
309307
}
310308
}
309+
310+
TableRegistration table = metadataManager.getTableRegistration(tablePath);
311+
312+
// Alter table bucket
313+
alterTableBucket(table, tableDescriptor);
314+
315+
// Alter table registration
311316
metadataManager.alterTable(tablePath, tableDescriptor, request.isIgnoreIfNotExists());
312-
return CompletableFuture.completedFuture(alterTableResponse);
317+
318+
return CompletableFuture.completedFuture(new AlterTableResponse());
319+
}
320+
321+
private void alterTableBucket(TableRegistration table, TableDescriptor tableDescriptor) {
322+
int newNumBuckets =
323+
tableDescriptor
324+
.getTableDistribution()
325+
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
326+
.orElse(0);
327+
if (newNumBuckets == 0) {
328+
return;
329+
}
330+
331+
// Only alter table bucket when it's not partitioned table.
332+
// For partitioned tables, changing the bucket count of existing partitions must be done via
333+
// alterPartition,
334+
// while newly created partitions will use the new configuration for their bucket count.
335+
if (!table.isPartitioned()) {
336+
// Alter table bucket
337+
TableAssignment existingAssignment = metadataManager.getTableAssignment(table.tableId);
338+
// TODO: We should prevent adding buckets while table reassignment is in progress.
339+
int oldNumBuckets = table.bucketCount;
340+
int numBucketsIncrement = newNumBuckets - oldNumBuckets;
341+
342+
// Only support bucket expansion
343+
if (numBucketsIncrement > 0) {
344+
int replicaFactor = table.getTableConfig().getReplicationFactor();
345+
TabletServerInfo[] servers = metadataCache.getLiveServers();
346+
// TODO: revisit this
347+
BucketAssignment existingAssignmentBucket0 =
348+
existingAssignment.getBucketAssignment(0);
349+
int startIndex = Math.max(0, existingAssignmentBucket0.getReplicas().get(0));
350+
Map<Integer, BucketAssignment> newBucketsAssignment =
351+
generateAssignment(
352+
numBucketsIncrement, replicaFactor, servers, oldNumBuckets)
353+
.getBucketAssignments();
354+
metadataManager.alterTableBucket(
355+
table.tableId,
356+
existingAssignment,
357+
new TableAssignment(newBucketsAssignment));
358+
} else if (numBucketsIncrement < 0) {
359+
throw new InvalidBucketsException(
360+
String.format(
361+
"Table currently has %d buckets, which is higher than the requested %d.",
362+
oldNumBuckets, newNumBuckets));
363+
}
364+
// Do nothing if bucket num not change
365+
}
366+
367+
// TODO: should alter lake table's bucket if lake table enable
313368
}
314369

315370
private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) {

0 commit comments

Comments
 (0)