Skip to content

Commit c009e11

Browse files
committed
[server] Support Rescale Bucket for Log Table Without Bucket Keys
1 parent f1a75ca commit c009e11

File tree

33 files changed

+1880
-66
lines changed

33 files changed

+1880
-66
lines changed

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.Set;
8181
import java.util.stream.Collectors;
8282

83+
import static org.apache.fluss.config.FlussConfigUtils.BUCKET_NUM;
8384
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
8485
import static org.apache.fluss.utils.Preconditions.checkState;
8586

@@ -359,7 +360,8 @@ public static AlterTableRequest makeAlterTableRequest(
359360
} else if (tableChange instanceof TableChange.ModifyColumn) {
360361
modifyColumns.add(toPbModifyColumn((TableChange.ModifyColumn) tableChange));
361362
} else if (tableChange instanceof TableChange.SetOption
362-
|| tableChange instanceof TableChange.ResetOption) {
363+
|| tableChange instanceof TableChange.ResetOption
364+
|| tableChange instanceof TableChange.BucketNumOption) {
363365
alterConfigs.add(toPbAlterConfigs(tableChange));
364366
} else {
365367
throw new IllegalArgumentException(
@@ -402,7 +404,12 @@ public static PbPartitionSpec makePbPartitionSpec(PartitionSpec partitionSpec) {
402404

403405
public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) {
404406
PbAlterConfig info = new PbAlterConfig();
405-
if (tableChange instanceof TableChange.SetOption) {
407+
if (tableChange instanceof TableChange.BucketNumOption) {
408+
TableChange.BucketNumOption bucketNumOption = (TableChange.BucketNumOption) tableChange;
409+
info.setConfigKey(BUCKET_NUM);
410+
info.setConfigValue(String.valueOf(bucketNumOption.getBucketNum()));
411+
info.setOpType(AlterConfigOpType.SET.value());
412+
} else if (tableChange instanceof TableChange.SetOption) {
406413
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
407414
info.setConfigKey(setOption.getKey());
408415
info.setConfigValue(setOption.getValue());

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,13 @@ public static void waitAllSchemaSync(TablePath tablePath, int schemaId) {
244244
FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, schemaId);
245245
}
246246

247+
public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) {
248+
for (int i = 0; i < expectBucketCount; i++) {
249+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
250+
new TableBucket(tableId, partitionId, i));
251+
}
252+
}
253+
247254
protected static void verifyRows(
248255
RowType rowType,
249256
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.config.ConfigOptions;
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.config.MemorySize;
35+
import org.apache.fluss.exception.InvalidTableException;
3536
import org.apache.fluss.fs.FsPath;
3637
import org.apache.fluss.fs.TestFileSystem;
3738
import org.apache.fluss.metadata.DataLakeFormat;
@@ -66,8 +67,10 @@
6667
import java.time.Duration;
6768
import java.util.ArrayList;
6869
import java.util.Collections;
70+
import java.util.HashSet;
6971
import java.util.Iterator;
7072
import java.util.List;
73+
import java.util.Set;
7174
import java.util.concurrent.CompletableFuture;
7275

7376
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
@@ -809,6 +812,103 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm
809812
}
810813
}
811814

815+
@Test
816+
void testAppendWithAlterTableBucket() throws Exception {
817+
TableDescriptor data1TableDescriptor =
818+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
819+
createTable(DATA1_TABLE_PATH, data1TableDescriptor, false);
820+
TableInfo tableInfo = admin.getTableInfo(DATA1_TABLE_PATH).get();
821+
822+
int lastCount = verifyAppendForAlterTableBucket(1, 0);
823+
824+
// alter table bucket from 1 to 2
825+
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
826+
admin.alterTable(DATA1_TABLE_PATH, tableChanges, false);
827+
828+
// wait until new bucket replicas are ready
829+
waitAllReplicasReady(tableInfo.getTableId(), 2);
830+
831+
verifyAppendForAlterTableBucket(2, lastCount);
832+
}
833+
834+
int verifyAppendForAlterTableBucket(int bucketNum, int lastCount) throws Exception {
835+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
836+
// use round-robin bucket assigner, so that we can append data to all buckets
837+
clientConf.set(
838+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
839+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
840+
Connection conn = ConnectionFactory.createConnection(clientConf);
841+
int rowCount = 10;
842+
int expectedRowCount = lastCount + rowCount;
843+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
844+
AppendWriter appendWriter = table.newAppend().createWriter();
845+
846+
for (int i = 0; i < rowCount; i++) {
847+
GenericRow row = row(i, "a");
848+
appendWriter.append(row).get();
849+
}
850+
appendWriter.flush();
851+
852+
try (LogScanner logScanner = createLogScanner(table)) {
853+
subscribeFromBeginning(logScanner, table);
854+
855+
int count = 0;
856+
Set<TableBucket> allBuckets = new HashSet<>();
857+
while (count < expectedRowCount) {
858+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
859+
allBuckets.addAll(scanRecords.buckets());
860+
count += scanRecords.count();
861+
}
862+
assertThat(allBuckets.size()).isEqualTo(bucketNum);
863+
assertThat(count).isEqualTo(expectedRowCount);
864+
}
865+
}
866+
conn.close();
867+
return expectedRowCount;
868+
}
869+
870+
@Test
871+
void testInvalidAlterTableBucket() throws Exception {
872+
Schema logTableSchema =
873+
Schema.newBuilder()
874+
.column("a", DataTypes.INT())
875+
.column("b", DataTypes.STRING())
876+
.build();
877+
TableDescriptor td1 =
878+
TableDescriptor.builder().schema(logTableSchema).distributedBy(1, "a").build();
879+
TablePath t1 = TablePath.of("test_db_1", "test_invalid_alter_table_bucket_1");
880+
createTable(t1, td1, false);
881+
882+
// alter table bucket from 1 to 2
883+
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
884+
// alter table bucket is not supported for log table with bucket keys now
885+
assertThatThrownBy(() -> admin.alterTable(t1, tableChanges, false).get())
886+
.cause()
887+
.isInstanceOf(InvalidTableException.class)
888+
.hasMessage(
889+
"Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now.");
890+
891+
Schema pkTableSchema =
892+
Schema.newBuilder()
893+
.column("a", DataTypes.INT())
894+
.column("b", DataTypes.STRING())
895+
.primaryKey("a")
896+
.build();
897+
TableDescriptor td2 =
898+
TableDescriptor.builder().schema(pkTableSchema).distributedBy(1).build();
899+
TablePath t2 = TablePath.of("test_db_2", "test_invalid_alter_table_bucket_2");
900+
createTable(t2, td2, false);
901+
902+
// alter table bucket from 1 to 2
903+
List<TableChange> tableChanges2 = Collections.singletonList(TableChange.bucketNum(2));
904+
// alter table bucket is not supported for pk table now
905+
assertThatThrownBy(() -> admin.alterTable(t2, tableChanges2, false).get())
906+
.cause()
907+
.isInstanceOf(InvalidTableException.class)
908+
.hasMessage(
909+
"Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now.");
910+
}
911+
812912
@ParameterizedTest
813913
@ValueSource(strings = {"INDEXED", "ARROW"})
814914
void testAppendAndProject(String format) throws Exception {

fluss-client/src/test/java/org/apache/fluss/client/table/PartitionedTableITCase.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,25 @@
1717

1818
package org.apache.fluss.client.table;
1919

20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
2022
import org.apache.fluss.client.admin.ClientToServerITCaseBase;
2123
import org.apache.fluss.client.lookup.Lookuper;
24+
import org.apache.fluss.client.table.scanner.log.LogScanner;
25+
import org.apache.fluss.client.table.scanner.log.ScanRecords;
2226
import org.apache.fluss.client.table.writer.AppendWriter;
2327
import org.apache.fluss.client.table.writer.UpsertWriter;
2428
import org.apache.fluss.config.ConfigOptions;
29+
import org.apache.fluss.config.Configuration;
2530
import org.apache.fluss.exception.PartitionNotExistException;
2631
import org.apache.fluss.exception.TooManyPartitionsException;
2732
import org.apache.fluss.metadata.PartitionInfo;
2833
import org.apache.fluss.metadata.PhysicalTablePath;
2934
import org.apache.fluss.metadata.Schema;
35+
import org.apache.fluss.metadata.TableBucket;
36+
import org.apache.fluss.metadata.TableChange;
3037
import org.apache.fluss.metadata.TableDescriptor;
38+
import org.apache.fluss.metadata.TableInfo;
3139
import org.apache.fluss.metadata.TablePath;
3240
import org.apache.fluss.row.GenericRow;
3341
import org.apache.fluss.row.InternalRow;
@@ -37,10 +45,13 @@
3745

3846
import java.time.Duration;
3947
import java.util.ArrayList;
48+
import java.util.Collections;
4049
import java.util.HashMap;
50+
import java.util.HashSet;
4151
import java.util.List;
4252
import java.util.Map;
4353
import java.util.Optional;
54+
import java.util.Set;
4455

4556
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
4657
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -146,6 +157,95 @@ void testPartitionedLogTable() throws Exception {
146157
verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows);
147158
}
148159

160+
@Test
161+
void testAppendForAlterPartitionTableBucket() throws Exception {
162+
TablePath tablePath = TablePath.of("test_db_1", "test_alter_partition_table_bucket");
163+
createPartitionedTable(tablePath, false);
164+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
165+
166+
List<PartitionInfo> partitionInfos = admin.listPartitionInfos(tablePath).get();
167+
assertThat(partitionInfos.isEmpty()).isTrue();
168+
169+
// add 1 partition
170+
admin.createPartition(tablePath, newPartitionSpec("c", "c0"), false).get();
171+
partitionInfos = admin.listPartitionInfos(tablePath).get();
172+
assertThat(partitionInfos.size()).isEqualTo(1);
173+
174+
// verify append for 1 partition with 1 bucket
175+
int lastCount = verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 1, 0);
176+
177+
// alter table bucket from 1 to 2
178+
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
179+
admin.alterTable(tablePath, tableChanges, false);
180+
181+
// wait until new bucket replicas are ready
182+
for (PartitionInfo partitionInfo : partitionInfos) {
183+
waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2);
184+
}
185+
186+
// verify append for 1 partition with 2 bucket
187+
lastCount =
188+
verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount);
189+
190+
// add another partition, which may have 2 buckets
191+
admin.createPartition(tablePath, newPartitionSpec("c", "c1"), false).get();
192+
partitionInfos = admin.listPartitionInfos(tablePath).get();
193+
assertThat(partitionInfos.size()).isEqualTo(2);
194+
195+
// wait until new bucket replicas are ready
196+
for (PartitionInfo partitionInfo : partitionInfos) {
197+
waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2);
198+
}
199+
200+
// newly created partition should also have 2 buckets
201+
verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount);
202+
}
203+
204+
private int verifyAppendForAlterPartitionTableBucket(
205+
TablePath tablePath, List<PartitionInfo> partitionInfos, int bucketNum, int lastCount)
206+
throws Exception {
207+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
208+
// use round-robin bucket assigner, so that we can append data to all buckets
209+
clientConf.set(
210+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
211+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
212+
Connection conn = ConnectionFactory.createConnection(clientConf);
213+
Table table = conn.getTable(tablePath);
214+
AppendWriter appendWriter = table.newAppend().createWriter();
215+
216+
int recordsPerPartition = 5;
217+
for (PartitionInfo partitionInfo : partitionInfos) {
218+
String partitionName = partitionInfo.getPartitionName();
219+
for (int j = 0; j < recordsPerPartition; j++) {
220+
InternalRow row = row(j, "a" + j, partitionName);
221+
appendWriter.append(row);
222+
}
223+
}
224+
appendWriter.flush();
225+
226+
int expectedCount = partitionInfos.size() * recordsPerPartition + lastCount;
227+
try (LogScanner logScanner = table.newScan().createLogScanner()) {
228+
for (PartitionInfo partitionInfo : partitionInfos) {
229+
for (int i = 0; i < bucketNum; i++) {
230+
logScanner.subscribeFromBeginning(partitionInfo.getPartitionId(), i);
231+
}
232+
}
233+
234+
int count = 0;
235+
Set<TableBucket> allBuckets = new HashSet<>();
236+
while (count < expectedCount) {
237+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
238+
allBuckets.addAll(scanRecords.buckets());
239+
240+
count += scanRecords.count();
241+
}
242+
assertThat(allBuckets.size()).isEqualTo(partitionInfos.size() * bucketNum);
243+
assertThat(count).isEqualTo(expectedCount);
244+
}
245+
conn.close();
246+
return expectedCount;
247+
}
248+
149249
@Test
150250
void testWriteToNonExistsPartitionWhenDisabledDynamicPartition() throws Exception {
151251
clientConf.set(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED, false);

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
@Internal
3131
public class FlussConfigUtils {
3232

33+
public static final String BUCKET_NUM = "bucket.num";
34+
3335
public static final Map<String, ConfigOption<?>> TABLE_OPTIONS;
3436
public static final Map<String, ConfigOption<?>> CLIENT_OPTIONS;
3537
public static final String TABLE_PREFIX = "table.";

fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
/** {@link TableChange} represents the modification of the Fluss Table. */
2727
public interface TableChange {
2828

29+
static BucketNumOption bucketNum(int bucketNum) {
30+
return new BucketNumOption(bucketNum);
31+
}
32+
2933
/**
3034
* A table change toadd the column with specified position.
3135
*
@@ -117,6 +121,46 @@ static ResetOption reset(String key) {
117121
return new ResetOption(key);
118122
}
119123

124+
/**
125+
* A table change to change the bucket num.
126+
*
127+
* <p>It is equal to the following statement:
128+
*
129+
* <pre>
130+
* ALTER TABLE &lt;table_name&gt; SET 'bucket.num' = '&lt;bucketNum&gt;';
131+
* </pre>
132+
*/
133+
class BucketNumOption implements TableChange {
134+
private final int bucketNum;
135+
136+
public BucketNumOption(int bucketNum) {
137+
this.bucketNum = bucketNum;
138+
}
139+
140+
public int getBucketNum() {
141+
return bucketNum;
142+
}
143+
144+
@Override
145+
public boolean equals(Object o) {
146+
if (o == null || getClass() != o.getClass()) {
147+
return false;
148+
}
149+
BucketNumOption that = (BucketNumOption) o;
150+
return bucketNum == that.bucketNum;
151+
}
152+
153+
@Override
154+
public int hashCode() {
155+
return Objects.hashCode(bucketNum);
156+
}
157+
158+
@Override
159+
public String toString() {
160+
return "BucketNumOption{" + "bucketNum=" + bucketNum + '}';
161+
}
162+
}
163+
120164
/**
121165
* A table change to set the table option.
122166
*

0 commit comments

Comments
 (0)