Skip to content

Commit 3943a46

Browse files
committed
[server] Support Rescale Bucket for Log Table Without Bucket Keys
1 parent 8635169 commit 3943a46

File tree

34 files changed

+1825
-67
lines changed

34 files changed

+1825
-67
lines changed

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,13 @@ public static PbPartitionSpec makePbPartitionSpec(PartitionSpec partitionSpec) {
356356

357357
public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) {
358358
PbAlterConfig info = new PbAlterConfig();
359-
if (tableChange instanceof TableChange.SetOption) {
359+
if (tableChange instanceof TableChange.BucketNumOption) {
360+
TableChange.BucketNumOption bucketNumOption = (TableChange.BucketNumOption) tableChange;
361+
// use empty string for BUCKET_NUM
362+
info.setConfigKey("");
363+
info.setConfigValue(String.valueOf(bucketNumOption.getBucketNum()));
364+
info.setOpType(AlterConfigOpType.BUCKET_NUM.value());
365+
} else if (tableChange instanceof TableChange.SetOption) {
360366
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
361367
info.setConfigKey(setOption.getKey());
362368
info.setConfigValue(setOption.getValue());

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,13 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) {
240240
}
241241
}
242242

243+
public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) {
244+
for (int i = 0; i < expectBucketCount; i++) {
245+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
246+
new TableBucket(tableId, partitionId, i));
247+
}
248+
}
249+
243250
protected static void verifyRows(
244251
RowType rowType,
245252
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.config.ConfigOptions;
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.config.MemorySize;
35+
import org.apache.fluss.exception.InvalidTableException;
3536
import org.apache.fluss.fs.FsPath;
3637
import org.apache.fluss.fs.TestFileSystem;
3738
import org.apache.fluss.metadata.DataLakeFormat;
@@ -40,6 +41,7 @@
4041
import org.apache.fluss.metadata.MergeEngineType;
4142
import org.apache.fluss.metadata.Schema;
4243
import org.apache.fluss.metadata.TableBucket;
44+
import org.apache.fluss.metadata.TableChange;
4345
import org.apache.fluss.metadata.TableDescriptor;
4446
import org.apache.fluss.metadata.TableInfo;
4547
import org.apache.fluss.metadata.TablePath;
@@ -65,8 +67,10 @@
6567
import java.time.Duration;
6668
import java.util.ArrayList;
6769
import java.util.Collections;
70+
import java.util.HashSet;
6871
import java.util.Iterator;
6972
import java.util.List;
73+
import java.util.Set;
7074
import java.util.concurrent.CompletableFuture;
7175

7276
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
@@ -724,6 +728,103 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm
724728
}
725729
}
726730

731+
@Test
732+
void testAppendWithAlterTableBucket() throws Exception {
733+
TableDescriptor data1TableDescriptor =
734+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
735+
createTable(DATA1_TABLE_PATH, data1TableDescriptor, false);
736+
TableInfo tableInfo = admin.getTableInfo(DATA1_TABLE_PATH).get();
737+
738+
int lastCount = verifyAppendForAlterTableBucket(1, 0);
739+
740+
// alter table bucket from 1 to 2
741+
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
742+
admin.alterTable(DATA1_TABLE_PATH, tableChanges, false);
743+
744+
// wait until new bucket replicas are ready
745+
waitAllReplicasReady(tableInfo.getTableId(), 2);
746+
747+
verifyAppendForAlterTableBucket(2, lastCount);
748+
}
749+
750+
int verifyAppendForAlterTableBucket(int bucketNum, int lastCount) throws Exception {
751+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
752+
// use round-robin bucket assigner, so that we can append data to all buckets
753+
clientConf.set(
754+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
755+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
756+
Connection conn = ConnectionFactory.createConnection(clientConf);
757+
int rowCount = 10;
758+
int expectedRowCount = lastCount + rowCount;
759+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
760+
AppendWriter appendWriter = table.newAppend().createWriter();
761+
762+
for (int i = 0; i < rowCount; i++) {
763+
GenericRow row = row(i, "a");
764+
appendWriter.append(row).get();
765+
}
766+
appendWriter.flush();
767+
768+
try (LogScanner logScanner = createLogScanner(table)) {
769+
subscribeFromBeginning(logScanner, table);
770+
771+
int count = 0;
772+
Set<TableBucket> allBuckets = new HashSet<>();
773+
while (count < expectedRowCount) {
774+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
775+
allBuckets.addAll(scanRecords.buckets());
776+
count += scanRecords.count();
777+
}
778+
assertThat(allBuckets.size()).isEqualTo(bucketNum);
779+
assertThat(count).isEqualTo(expectedRowCount);
780+
}
781+
}
782+
conn.close();
783+
return expectedRowCount;
784+
}
785+
786+
@Test
787+
void testInvalidAlterTableBucket() throws Exception {
788+
Schema logTableSchema =
789+
Schema.newBuilder()
790+
.column("a", DataTypes.INT())
791+
.column("b", DataTypes.STRING())
792+
.build();
793+
TableDescriptor td1 =
794+
TableDescriptor.builder().schema(logTableSchema).distributedBy(1, "a").build();
795+
TablePath t1 = TablePath.of("test_db_1", "test_invalid_alter_table_bucket_1");
796+
createTable(t1, td1, false);
797+
798+
// alter table bucket from 1 to 2
799+
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
800+
// alter table bucket is not supported for log table with bucket keys now
801+
assertThatThrownBy(() -> admin.alterTable(t1, tableChanges, false).get())
802+
.cause()
803+
.isInstanceOf(InvalidTableException.class)
804+
.hasMessage(
805+
"Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now.");
806+
807+
Schema pkTableSchema =
808+
Schema.newBuilder()
809+
.column("a", DataTypes.INT())
810+
.column("b", DataTypes.STRING())
811+
.primaryKey("a")
812+
.build();
813+
TableDescriptor td2 =
814+
TableDescriptor.builder().schema(pkTableSchema).distributedBy(1).build();
815+
TablePath t2 = TablePath.of("test_db_2", "test_invalid_alter_table_bucket_2");
816+
createTable(t2, td2, false);
817+
818+
// alter table bucket from 1 to 2
819+
List<TableChange> tableChanges2 = Collections.singletonList(TableChange.bucketNum(2));
820+
// alter table bucket is not supported for pk table now
821+
assertThatThrownBy(() -> admin.alterTable(t2, tableChanges2, false).get())
822+
.cause()
823+
.isInstanceOf(InvalidTableException.class)
824+
.hasMessage(
825+
"Alter table bucket is not supported for Log table with bucket key or PrimaryKey Table now.");
826+
}
827+
727828
@ParameterizedTest
728829
@ValueSource(strings = {"INDEXED", "ARROW"})
729830
void testAppendAndProject(String format) throws Exception {

fluss-client/src/test/java/org/apache/fluss/client/table/PartitionedTableITCase.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,25 @@
1717

1818
package org.apache.fluss.client.table;
1919

20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
2022
import org.apache.fluss.client.admin.ClientToServerITCaseBase;
2123
import org.apache.fluss.client.lookup.Lookuper;
24+
import org.apache.fluss.client.table.scanner.log.LogScanner;
25+
import org.apache.fluss.client.table.scanner.log.ScanRecords;
2226
import org.apache.fluss.client.table.writer.AppendWriter;
2327
import org.apache.fluss.client.table.writer.UpsertWriter;
2428
import org.apache.fluss.config.ConfigOptions;
29+
import org.apache.fluss.config.Configuration;
2530
import org.apache.fluss.exception.PartitionNotExistException;
2631
import org.apache.fluss.exception.TooManyPartitionsException;
2732
import org.apache.fluss.metadata.PartitionInfo;
2833
import org.apache.fluss.metadata.PhysicalTablePath;
2934
import org.apache.fluss.metadata.Schema;
35+
import org.apache.fluss.metadata.TableBucket;
36+
import org.apache.fluss.metadata.TableChange;
3037
import org.apache.fluss.metadata.TableDescriptor;
38+
import org.apache.fluss.metadata.TableInfo;
3139
import org.apache.fluss.metadata.TablePath;
3240
import org.apache.fluss.row.GenericRow;
3341
import org.apache.fluss.row.InternalRow;
@@ -37,10 +45,13 @@
3745

3846
import java.time.Duration;
3947
import java.util.ArrayList;
48+
import java.util.Collections;
4049
import java.util.HashMap;
50+
import java.util.HashSet;
4151
import java.util.List;
4252
import java.util.Map;
4353
import java.util.Optional;
54+
import java.util.Set;
4455

4556
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
4657
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -146,6 +157,95 @@ void testPartitionedLogTable() throws Exception {
146157
verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows);
147158
}
148159

160+
@Test
161+
void testAppendForAlterPartitionTableBucket() throws Exception {
162+
TablePath tablePath = TablePath.of("test_db_1", "test_alter_partition_table_bucket");
163+
createPartitionedTable(tablePath, false);
164+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
165+
166+
List<PartitionInfo> partitionInfos = admin.listPartitionInfos(tablePath).get();
167+
assertThat(partitionInfos.isEmpty()).isTrue();
168+
169+
// add 1 partition
170+
admin.createPartition(tablePath, newPartitionSpec("c", "c0"), false).get();
171+
partitionInfos = admin.listPartitionInfos(tablePath).get();
172+
assertThat(partitionInfos.size()).isEqualTo(1);
173+
174+
// verify append for 1 partition with 1 bucket
175+
int lastCount = verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 1, 0);
176+
177+
// alter table bucket from 1 to 2
178+
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
179+
admin.alterTable(tablePath, tableChanges, false);
180+
181+
// wait until new bucket replicas are ready
182+
for (PartitionInfo partitionInfo : partitionInfos) {
183+
waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2);
184+
}
185+
186+
// verify append for 1 partition with 2 bucket
187+
lastCount =
188+
verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount);
189+
190+
// add another partition, which may have 2 buckets
191+
admin.createPartition(tablePath, newPartitionSpec("c", "c1"), false).get();
192+
partitionInfos = admin.listPartitionInfos(tablePath).get();
193+
assertThat(partitionInfos.size()).isEqualTo(2);
194+
195+
// wait until new bucket replicas are ready
196+
for (PartitionInfo partitionInfo : partitionInfos) {
197+
waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2);
198+
}
199+
200+
// newly created partition should also have 2 buckets
201+
verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount);
202+
}
203+
204+
private int verifyAppendForAlterPartitionTableBucket(
205+
TablePath tablePath, List<PartitionInfo> partitionInfos, int bucketNum, int lastCount)
206+
throws Exception {
207+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
208+
// use round-robin bucket assigner, so that we can append data to all buckets
209+
clientConf.set(
210+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
211+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
212+
Connection conn = ConnectionFactory.createConnection(clientConf);
213+
Table table = conn.getTable(tablePath);
214+
AppendWriter appendWriter = table.newAppend().createWriter();
215+
216+
int recordsPerPartition = 5;
217+
for (PartitionInfo partitionInfo : partitionInfos) {
218+
String partitionName = partitionInfo.getPartitionName();
219+
for (int j = 0; j < recordsPerPartition; j++) {
220+
InternalRow row = row(j, "a" + j, partitionName);
221+
appendWriter.append(row);
222+
}
223+
}
224+
appendWriter.flush();
225+
226+
int expectedCount = partitionInfos.size() * recordsPerPartition + lastCount;
227+
try (LogScanner logScanner = table.newScan().createLogScanner()) {
228+
for (PartitionInfo partitionInfo : partitionInfos) {
229+
for (int i = 0; i < bucketNum; i++) {
230+
logScanner.subscribeFromBeginning(partitionInfo.getPartitionId(), i);
231+
}
232+
}
233+
234+
int count = 0;
235+
Set<TableBucket> allBuckets = new HashSet<>();
236+
while (count < expectedCount) {
237+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
238+
allBuckets.addAll(scanRecords.buckets());
239+
240+
count += scanRecords.count();
241+
}
242+
assertThat(allBuckets.size()).isEqualTo(partitionInfos.size() * bucketNum);
243+
assertThat(count).isEqualTo(expectedCount);
244+
}
245+
conn.close();
246+
return expectedCount;
247+
}
248+
149249
@Test
150250
void testWriteToNonExistsPartitionWhenDisabledDynamicPartition() throws Exception {
151251
clientConf.set(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED, false);

fluss-client/src/test/resources/log4j2-test.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
# Set root logger level to OFF to not flood build logs
2020
# set manually to INFO for debugging purposes
21-
rootLogger.level = OFF
21+
rootLogger.level = INFO
2222
rootLogger.appenderRef.test.ref = TestLogger
2323

2424
appender.testlogger.name = TestLogger

fluss-common/src/main/java/org/apache/fluss/config/cluster/AlterConfigOpType.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ public enum AlterConfigOpType {
2222
SET(0),
2323
DELETE(1),
2424
APPEND(2),
25-
SUBTRACT(3);
25+
SUBTRACT(3),
26+
BUCKET_NUM(4);
2627

2728
public final int value;
2829

@@ -40,6 +41,8 @@ public static AlterConfigOpType from(int opType) {
4041
return APPEND;
4142
case 3:
4243
return SUBTRACT;
44+
case 4:
45+
return BUCKET_NUM;
4346
default:
4447
throw new IllegalArgumentException("Unsupported AlterConfigOpType: " + opType);
4548
}

fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
/** {@link TableChange} represents the modification of the Fluss Table. */
2323
public interface TableChange {
2424

25+
static BucketNumOption bucketNum(int bucketNum) {
26+
return new BucketNumOption(bucketNum);
27+
}
28+
2529
static SetOption set(String key, String value) {
2630
return new SetOption(key, value);
2731
}
@@ -30,6 +34,46 @@ static ResetOption reset(String key) {
3034
return new ResetOption(key);
3135
}
3236

37+
/**
38+
* A table change to change the bucket num.
39+
*
40+
* <p>It is equal to the following statement:
41+
*
42+
* <pre>
43+
* ALTER TABLE &lt;table_name&gt; SET 'bucket.num' = '&lt;bucketNum&gt;';
44+
* </pre>
45+
*/
46+
class BucketNumOption implements TableChange {
47+
private final int bucketNum;
48+
49+
public BucketNumOption(int bucketNum) {
50+
this.bucketNum = bucketNum;
51+
}
52+
53+
public int getBucketNum() {
54+
return bucketNum;
55+
}
56+
57+
@Override
58+
public boolean equals(Object o) {
59+
if (o == null || getClass() != o.getClass()) {
60+
return false;
61+
}
62+
BucketNumOption that = (BucketNumOption) o;
63+
return bucketNum == that.bucketNum;
64+
}
65+
66+
@Override
67+
public int hashCode() {
68+
return Objects.hashCode(bucketNum);
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return "BucketNumOption{" + "bucketNum=" + bucketNum + '}';
74+
}
75+
}
76+
3377
/**
3478
* A table change to set the table option.
3579
*

0 commit comments

Comments
 (0)