Skip to content

Commit 928933f

Browse files
committed
[server] Support addPartition and dropPartition
1 parent 465730f commit 928933f

File tree

40 files changed

+2085
-557
lines changed

40 files changed

+2085
-557
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import com.alibaba.fluss.exception.InvalidTableException;
3030
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
3131
import com.alibaba.fluss.exception.NonPrimaryKeyTableException;
32+
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
3233
import com.alibaba.fluss.exception.PartitionNotExistException;
34+
import com.alibaba.fluss.exception.PartitionSpecInvalidException;
3335
import com.alibaba.fluss.exception.SchemaNotExistException;
3436
import com.alibaba.fluss.exception.TableAlreadyExistException;
3537
import com.alibaba.fluss.exception.TableNotExistException;
@@ -38,6 +40,7 @@
3840
import com.alibaba.fluss.metadata.DatabaseDescriptor;
3941
import com.alibaba.fluss.metadata.DatabaseInfo;
4042
import com.alibaba.fluss.metadata.PartitionInfo;
43+
import com.alibaba.fluss.metadata.PartitionSpec;
4144
import com.alibaba.fluss.metadata.PhysicalTablePath;
4245
import com.alibaba.fluss.metadata.SchemaInfo;
4346
import com.alibaba.fluss.metadata.TableBucket;
@@ -328,4 +331,48 @@ ListOffsetsResult listOffsets(
328331

329332
/** Describe the lake used for lakehouse storage. */
330333
CompletableFuture<LakeStorageInfo> describeLakeStorage();
334+
335+
/**
336+
* Add a partition to a partitioned table.
337+
*
338+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
339+
*
340+
* <ul>
341+
* <li>{@link TableNotExistException} if the table does not exist.
342+
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
343+
* <li>{@link PartitionAlreadyExistsException} if the partition already exists and {@code
344+
* ignoreIfExists} is false.
345+
* <li>{@link PartitionSpecInvalidException} if the input partition spec is invalid.
346+
* </ul>
347+
*
348+
* @param tablePath The table path of the table.
349+
* @param partitionSpec The partition spec to add.
350+
* @param ignoreIfExists Flag to specify behavior when a partition with the given name already
351+
* exists: if set to false, throw a PartitionAlreadyExistsException, if set to true, do
352+
* nothing.
353+
*/
354+
CompletableFuture<Void> addPartition(
355+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists);
356+
357+
/**
358+
* Drop a partition from a partitioned table.
359+
*
360+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
361+
*
362+
* <ul>
363+
* <li>{@link TableNotExistException} if the table does not exist.
364+
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
365+
* <li>{@link PartitionNotExistException} if the partition not exists and {@code
366+
* ignoreIfExists} is false.
367+
* <li>{@link PartitionSpecInvalidException} if the input partition spec is invalid.
368+
* </ul>
369+
*
370+
* @param tablePath The table path of the table.
371+
* @param partitionSpec The partition spec to drop.
372+
* @param ignoreIfNotExists Flag to specify behavior when a partition with the given name does
373+
* not exist: if set to false, throw a PartitionNotExistException, if set to true, do
374+
* nothing.
375+
*/
376+
CompletableFuture<Void> dropPartition(
377+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists);
331378
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2828
import com.alibaba.fluss.metadata.DatabaseInfo;
2929
import com.alibaba.fluss.metadata.PartitionInfo;
30+
import com.alibaba.fluss.metadata.PartitionSpec;
3031
import com.alibaba.fluss.metadata.PhysicalTablePath;
3132
import com.alibaba.fluss.metadata.Schema;
3233
import com.alibaba.fluss.metadata.SchemaInfo;
@@ -73,6 +74,8 @@
7374
import java.util.Map;
7475
import java.util.concurrent.CompletableFuture;
7576

77+
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeAddPartitionRequest;
78+
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
7679
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
7780
import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
7881
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
@@ -347,6 +350,22 @@ public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
347350
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
348351
}
349352

353+
@Override
354+
public CompletableFuture<Void> addPartition(
355+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) {
356+
return gateway.addPartition(
357+
makeAddPartitionRequest(tablePath, partitionSpec, ignoreIfExists))
358+
.thenApply(r -> null);
359+
}
360+
361+
@Override
362+
public CompletableFuture<Void> dropPartition(
363+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
364+
return gateway.dropPartition(
365+
makeDropPartitionRequest(tablePath, partitionSpec, ignoreIfNotExists))
366+
.thenApply(r -> null);
367+
}
368+
350369
@Override
351370
public void close() {
352371
// nothing to do yet

fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,20 @@
1818

1919
import com.alibaba.fluss.row.InternalRow;
2020
import com.alibaba.fluss.types.DataType;
21-
import com.alibaba.fluss.types.DataTypeRoot;
2221
import com.alibaba.fluss.types.RowType;
2322

23+
import java.util.Collections;
2424
import java.util.List;
2525

2626
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
2727
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
28+
import static com.alibaba.fluss.utils.PartitionUtils.getPartitionName;
2829

2930
/** A getter to get partition name from a row. */
3031
public class PartitionGetter {
3132

33+
// TODO currently, only support one partition key.
34+
private final String partitionKeyName;
3235
private final InternalRow.FieldGetter partitionFieldGetter;
3336

3437
public PartitionGetter(RowType rowType, List<String> partitionKeys) {
@@ -38,30 +41,29 @@ public PartitionGetter(RowType rowType, List<String> partitionKeys) {
3841
"Currently, partitioned table only supports one partition key, but got partition keys %s.",
3942
partitionKeys));
4043
}
44+
4145
// check the partition column
4246
List<String> fieldNames = rowType.getFieldNames();
43-
String partitionColumnName = partitionKeys.get(0);
44-
int partitionColumnIndex = fieldNames.indexOf(partitionColumnName);
47+
this.partitionKeyName = partitionKeys.get(0);
48+
int partitionColumnIndex = fieldNames.indexOf(partitionKeyName);
4549
checkArgument(
4650
partitionColumnIndex >= 0,
4751
"The partition column %s is not in the row %s.",
48-
partitionColumnName,
52+
partitionKeyName,
4953
rowType);
5054

5155
// check the data type of the partition column
5256
DataType partitionColumnDataType = rowType.getTypeAt(partitionColumnIndex);
53-
checkArgument(
54-
partitionColumnDataType.getTypeRoot() == DataTypeRoot.STRING,
55-
"Currently, partitioned table only supports STRING type partition key, but got partition key '%s' with data type %s.",
56-
partitionColumnName,
57-
partitionColumnDataType);
5857
this.partitionFieldGetter =
5958
InternalRow.createFieldGetter(partitionColumnDataType, partitionColumnIndex);
6059
}
6160

6261
public String getPartition(InternalRow row) {
6362
Object partitionValue = partitionFieldGetter.getFieldOrNull(row);
6463
checkNotNull(partitionValue, "Partition value shouldn't be null.");
65-
return partitionValue.toString();
64+
return getPartitionName(
65+
Collections.singletonList(partitionKeyName),
66+
Collections.singletonList(partitionValue.toString()),
67+
false);
6668
}
6769
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
3030
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
3131
import com.alibaba.fluss.metadata.PartitionInfo;
32+
import com.alibaba.fluss.metadata.PartitionSpec;
3233
import com.alibaba.fluss.metadata.PhysicalTablePath;
3334
import com.alibaba.fluss.metadata.TableBucket;
3435
import com.alibaba.fluss.metadata.TablePath;
@@ -37,7 +38,9 @@
3738
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
3839
import com.alibaba.fluss.remote.RemoteLogSegment;
3940
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
41+
import com.alibaba.fluss.rpc.messages.AddPartitionRequest;
4042
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
43+
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
4144
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
4245
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
4346
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
@@ -377,6 +380,44 @@ public static ListOffsetsRequest makeListOffsetsRequest(
377380
return listOffsetsRequest;
378381
}
379382

383+
public static AddPartitionRequest makeAddPartitionRequest(
384+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
385+
AddPartitionRequest addPartitionRequest =
386+
new AddPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists);
387+
addPartitionRequest
388+
.setTablePath()
389+
.setDatabaseName(tablePath.getDatabaseName())
390+
.setTableName(tablePath.getTableName());
391+
partitionSpec
392+
.getPartitionSpec()
393+
.forEach(
394+
(partitionKey, value) ->
395+
addPartitionRequest
396+
.addPartitionSpec()
397+
.setPartitionKey(partitionKey)
398+
.setValue(value));
399+
return addPartitionRequest;
400+
}
401+
402+
public static DropPartitionRequest makeDropPartitionRequest(
403+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
404+
DropPartitionRequest dropPartitionRequest =
405+
new DropPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists);
406+
dropPartitionRequest
407+
.setTablePath()
408+
.setDatabaseName(tablePath.getDatabaseName())
409+
.setTableName(tablePath.getTableName());
410+
partitionSpec
411+
.getPartitionSpec()
412+
.forEach(
413+
(partitionKey, value) ->
414+
dropPartitionRequest
415+
.addPartitionSpec()
416+
.setPartitionKey(partitionKey)
417+
.setValue(value));
418+
return dropPartitionRequest;
419+
}
420+
380421
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
381422
return response.getPartitionsInfosList().stream()
382423
.map(

0 commit comments

Comments
 (0)