Skip to content

Commit 6a1c9ce

Browse files
committed
[WIP] Bucket Rescale
1 parent 310a77d commit 6a1c9ce

File tree

12 files changed

+443
-17
lines changed

12 files changed

+443
-17
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public abstract class ClientToServerITCaseBase {
8383
@BeforeEach
8484
protected void setup() throws Exception {
8585
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
86+
clientConf.set(
87+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
88+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
8689
conn = ConnectionFactory.createConnection(clientConf);
8790
admin = conn.getAdmin();
8891
}

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.alibaba.fluss.metadata.TablePath;
6161
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
6262
import com.alibaba.fluss.server.kv.snapshot.KvSnapshotHandle;
63+
import com.alibaba.fluss.server.zk.data.TableAssignment;
6364
import com.alibaba.fluss.types.DataTypes;
6465

6566
import org.junit.jupiter.api.BeforeEach;
@@ -197,6 +198,54 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTableBucket() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableAssignment tableAssignment =
210+
FLUSS_CLUSTER_EXTENSION
211+
.getZooKeeperClient()
212+
.getTableAssignment(tableInfo.getTableId())
213+
.get();
214+
System.out.println(tableAssignment);
215+
216+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
217+
218+
TableDescriptor newTableDescriptor =
219+
TableDescriptor.builder()
220+
.schema(existingTableDescriptor.getSchema())
221+
.comment(existingTableDescriptor.getComment().orElse("test table"))
222+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
223+
.distributedBy(
224+
existingTableDescriptor
225+
.getTableDistribution()
226+
.get()
227+
.getBucketCount()
228+
.get()
229+
+ 1,
230+
existingTableDescriptor.getBucketKeys())
231+
.properties(existingTableDescriptor.getProperties())
232+
.customProperties(existingTableDescriptor.getCustomProperties())
233+
.build();
234+
// alter table
235+
admin.alterTable(tablePath, newTableDescriptor, false).get();
236+
237+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
238+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
239+
// assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
240+
241+
TableAssignment tableAssignment1 =
242+
FLUSS_CLUSTER_EXTENSION
243+
.getZooKeeperClient()
244+
.getTableAssignment(tableInfo.getTableId())
245+
.get();
246+
System.out.println(tableAssignment1);
247+
}
248+
200249
@Test
201250
void testAlterTable() throws Exception {
202251
// create table

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,59 @@ void testPutAndPoll(String kvFormat) throws Exception {
645645
verifyAppendOrPut(false, "ARROW", kvFormat);
646646
}
647647

648+
@Test
649+
void test() throws Exception {
650+
TableDescriptor DATA1_TABLE_DESCRIPTOR =
651+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
652+
createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, false);
653+
654+
long tableId;
655+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
656+
tableId = table.getTableInfo().getTableId();
657+
658+
AppendWriter appendWriter = table.newAppend().createWriter();
659+
660+
for (int i = 0; i < 10; i++) {
661+
appendWriter.append(row(i, "a")).get();
662+
}
663+
664+
try (LogScanner logScanner = createLogScanner(table)) {
665+
subscribeFromBeginning(logScanner, table);
666+
667+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
668+
for (ScanRecord record : scanRecords) {
669+
System.out.println(record);
670+
}
671+
}
672+
}
673+
674+
System.out.println("==");
675+
676+
TableDescriptor DATA2_TABLE_DESCRIPTOR =
677+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(2).build();
678+
admin.alterTable(DATA1_TABLE_PATH, DATA2_TABLE_DESCRIPTOR, false);
679+
680+
waitAllReplicasReady(tableId, 2);
681+
682+
conn = ConnectionFactory.createConnection(clientConf);
683+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
684+
AppendWriter appendWriter = table.newAppend().createWriter();
685+
686+
for (int i = 0; i < 10; i++) {
687+
appendWriter.append(row(i, "a")).get();
688+
}
689+
690+
try (LogScanner logScanner = createLogScanner(table)) {
691+
subscribeFromBeginning(logScanner, table);
692+
693+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
694+
for (ScanRecord record : scanRecords) {
695+
System.out.println(record);
696+
}
697+
}
698+
}
699+
}
700+
648701
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat)
649702
throws Exception {
650703
Schema schema =

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.alibaba.fluss.rpc.protocol.ApiError;
4343
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
4444
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
45+
import com.alibaba.fluss.server.coordinator.event.AlterTableBucketEvent;
4546
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4647
import com.alibaba.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
4748
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
@@ -57,6 +58,7 @@
5758
import com.alibaba.fluss.server.coordinator.event.FencedCoordinatorEvent;
5859
import com.alibaba.fluss.server.coordinator.event.NewTabletServerEvent;
5960
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
61+
import com.alibaba.fluss.server.coordinator.event.watcher.TableBucketChangeWatcher;
6062
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6163
import com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
6264
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
@@ -128,6 +130,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
128130
private final TableChangeWatcher tableChangeWatcher;
129131
private final CoordinatorChannelManager coordinatorChannelManager;
130132
private final TabletServerChangeWatcher tabletServerChangeWatcher;
133+
private final TableBucketChangeWatcher tableBucketChangeWatcher;
131134
private final CoordinatorMetadataCache serverMetadataCache;
132135
private final CoordinatorRequestBatch coordinatorRequestBatch;
133136
private final CoordinatorMetricGroup coordinatorMetricGroup;
@@ -178,6 +181,8 @@ public CoordinatorEventProcessor(
178181
this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager);
179182
this.tabletServerChangeWatcher =
180183
new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager);
184+
this.tableBucketChangeWatcher =
185+
new TableBucketChangeWatcher(zooKeeperClient, coordinatorEventManager);
181186
this.coordinatorRequestBatch =
182187
new CoordinatorRequestBatch(
183188
coordinatorChannelManager, coordinatorEventManager, coordinatorContext);
@@ -201,6 +206,7 @@ public void startup() {
201206
// start watchers first so that we won't miss node in zk;
202207
tabletServerChangeWatcher.start();
203208
tableChangeWatcher.start();
209+
tableBucketChangeWatcher.start();
204210
LOG.info("Initializing coordinator context.");
205211
try {
206212
initCoordinatorContext();
@@ -422,6 +428,7 @@ private void onShutdown() {
422428
// then stop watchers
423429
tableChangeWatcher.stop();
424430
tabletServerChangeWatcher.stop();
431+
tableBucketChangeWatcher.stop();
425432
}
426433

427434
@Override
@@ -470,6 +477,8 @@ public void process(CoordinatorEvent event) {
470477
completeFromCallable(
471478
commitLakeTableSnapshotEvent.getRespCallback(),
472479
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
480+
} else if (event instanceof AlterTableBucketEvent) {
481+
processAlterTableBucket((AlterTableBucketEvent) event);
473482
} else if (event instanceof AccessContextEvent) {
474483
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
475484
processAccessContext(accessContextEvent);
@@ -478,6 +487,59 @@ public void process(CoordinatorEvent event) {
478487
}
479488
}
480489

490+
private void processAlterTableBucket(AlterTableBucketEvent alterTableBucketEvent) {
491+
Map<TableBucket, BucketAssignment> bucketsToBeAdded =
492+
alterTableBucketEvent.getTableAssignment().getBucketAssignments().entrySet()
493+
.stream()
494+
.filter(
495+
entry ->
496+
!coordinatorContext
497+
.getTableAssignment(
498+
alterTableBucketEvent.getTableId())
499+
.containsKey(entry.getKey()))
500+
.collect(
501+
Collectors.toMap(
502+
entry ->
503+
new TableBucket(
504+
alterTableBucketEvent.getTableId(),
505+
entry.getKey()),
506+
Map.Entry::getValue));
507+
508+
if (coordinatorContext.isTableQueuedForDeletion(alterTableBucketEvent.getTableId())) {
509+
if (!bucketsToBeAdded.isEmpty()) {
510+
511+
} else {
512+
LOG.info(
513+
"Ignoring bucket change during table deletion as no new buckets are added");
514+
}
515+
} else if (!bucketsToBeAdded.isEmpty()) {
516+
LOG.info("New buckets to be added {}", bucketsToBeAdded);
517+
bucketsToBeAdded.forEach(
518+
(tableBucket, bucketAssignment) -> {
519+
coordinatorContext.updateBucketReplicaAssignment(
520+
tableBucket, bucketAssignment.getReplicas());
521+
});
522+
tableManager.onCreateNewTableBucket(
523+
alterTableBucketEvent.getTableId(), bucketsToBeAdded.keySet());
524+
525+
Set<TableBucket> tableBuckets = new HashSet<>();
526+
alterTableBucketEvent
527+
.getTableAssignment()
528+
.getBucketAssignments()
529+
.keySet()
530+
.forEach(
531+
bucketId ->
532+
tableBuckets.add(
533+
new TableBucket(
534+
alterTableBucketEvent.getTableId(), bucketId)));
535+
updateTabletServerMetadataCache(
536+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
537+
null,
538+
null,
539+
tableBuckets);
540+
}
541+
}
542+
481543
private void processCreateTable(CreateTableEvent createTableEvent) {
482544
long tableId = createTableEvent.getTableInfo().getTableId();
483545
// skip the table if it already exists

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,6 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
296296
AlterTableResponse alterTableResponse = new AlterTableResponse();
297297

298298
TableDescriptor tableDescriptor;
299-
300299
try {
301300
tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson());
302301
} catch (Exception e) {
@@ -308,7 +307,37 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
308307
throw new InvalidTableException(e.getMessage());
309308
}
310309
}
310+
311+
TableRegistration table = metadataManager.getTableRegistration(tablePath);
312+
// TODO: Alter bucket of partitioned table
313+
if (!table.isPartitioned()) {
314+
// Alter table bucket
315+
TableAssignment existingAssignment = metadataManager.getTableAssignment(table.tableId);
316+
int oldNumBuckets = table.bucketCount;
317+
int newNumBuckets = tableDescriptor.getTableDistribution().get().getBucketCount().get();
318+
int numBucketsIncrement = newNumBuckets - oldNumBuckets;
319+
320+
if (numBucketsIncrement > 0) {
321+
int replicaFactor = table.getTableConfig().getReplicationFactor();
322+
TabletServerInfo[] servers = metadataCache.getLiveServers();
323+
BucketAssignment existingAssignmentBucket0 =
324+
existingAssignment.getBucketAssignment(0);
325+
int startIndex = Math.max(0, existingAssignmentBucket0.getReplicas().get(0));
326+
// TODO: We should prevent adding buckets while table reassignment is in progress.
327+
Map<Integer, BucketAssignment> newBucketsAssignment =
328+
generateAssignment(
329+
numBucketsIncrement, replicaFactor, servers, oldNumBuckets)
330+
.getBucketAssignments();
331+
metadataManager.alterTableBucket(
332+
table.tableId,
333+
existingAssignment,
334+
new TableAssignment(newBucketsAssignment));
335+
}
336+
}
337+
338+
// Alter table registration
311339
metadataManager.alterTable(tablePath, tableDescriptor, request.isIgnoreIfNotExists());
340+
312341
return CompletableFuture.completedFuture(alterTableResponse);
313342
}
314343

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,10 @@ public void alterTable(
320320
}
321321
}
322322

323+
TableRegistration existTableReg = getTableRegistration(tablePath);
323324
try {
324325
TableRegistration updatedTableRegistration =
325-
getUpdatedTableRegistration(tablePath, tableDescriptor);
326+
getUpdatedTableRegistration(existTableReg, tableDescriptor);
326327
zookeeperClient.updateTable(tablePath, updatedTableRegistration);
327328
} catch (Exception e) {
328329
if (e instanceof KeeperException.NoNodeException) {
@@ -336,12 +337,20 @@ public void alterTable(
336337
}
337338
}
338339

340+
public void alterTableBucket(
341+
long tableId, TableAssignment existingAssignment, TableAssignment newAssignment) {
342+
try {
343+
existingAssignment.getBucketAssignments().putAll(newAssignment.getBucketAssignments());
344+
zookeeperClient.updateTableAssignment(tableId, existingAssignment);
345+
} catch (Exception e) {
346+
throw new FlussRuntimeException("Failed to update table assignment: " + tableId, e);
347+
}
348+
}
349+
339350
private TableRegistration getUpdatedTableRegistration(
340-
TablePath tablePath, TableDescriptor updateTableDescriptor) {
341-
TableRegistration existTableReg = getTableRegistration(tablePath);
351+
TableRegistration existTableReg, TableDescriptor updateTableDescriptor) {
342352
Map<String, String> updateProperties = updateTableDescriptor.getProperties();
343353
Map<String, String> updateCustomProperties = updateTableDescriptor.getCustomProperties();
344-
validateAlterTableProperties(updateTableDescriptor);
345354

346355
Map<String, String> newProperties = new HashMap<>(existTableReg.properties);
347356
for (Map.Entry<String, String> updateProperty : updateProperties.entrySet()) {
@@ -359,7 +368,15 @@ private TableRegistration getUpdatedTableRegistration(
359368
}
360369
}
361370

362-
return existTableReg.newProperties(newProperties, newCustomProperties);
371+
return new TableRegistration(
372+
existTableReg.tableId,
373+
existTableReg.comment,
374+
existTableReg.partitionKeys,
375+
updateTableDescriptor.getTableDistribution().get(),
376+
newProperties,
377+
newCustomProperties,
378+
existTableReg.createdTime,
379+
System.currentTimeMillis());
363380
}
364381

365382
public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
@@ -391,6 +408,19 @@ public TableRegistration getTableRegistration(TablePath tablePath) {
391408
return optionalTable.get();
392409
}
393410

411+
public TableAssignment getTableAssignment(long tableId) {
412+
Optional<TableAssignment> optionalTableAssignment;
413+
try {
414+
optionalTableAssignment = zookeeperClient.getTableAssignment(tableId);
415+
} catch (Exception e) {
416+
throw new RuntimeException(e);
417+
}
418+
if (!optionalTableAssignment.isPresent()) {
419+
throw new TableNotExistException("Table '" + tableId + "' does not exist.");
420+
}
421+
return optionalTableAssignment.get();
422+
}
423+
394424
public SchemaInfo getLatestSchema(TablePath tablePath) throws SchemaNotExistException {
395425
final int currentSchemaId;
396426
try {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/TableManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void onCreateNewPartition(
142142
onCreateNewTableBucket(tableId, newTableBuckets);
143143
}
144144

145-
private void onCreateNewTableBucket(long tableId, Set<TableBucket> tableBuckets) {
145+
public void onCreateNewTableBucket(long tableId, Set<TableBucket> tableBuckets) {
146146
LOG.info(
147147
"New table buckets: {} for table {}.",
148148
tableBuckets,

0 commit comments

Comments
 (0)