Skip to content

Commit c543376

Browse files
committed
address comments
1 parent 47ba09f commit c543376

File tree

44 files changed

+1008
-1190
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1008
-1190
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2626
import com.alibaba.fluss.exception.DatabaseNotExistException;
2727
import com.alibaba.fluss.exception.InvalidDatabaseException;
28+
import com.alibaba.fluss.exception.InvalidPartitionException;
2829
import com.alibaba.fluss.exception.InvalidReplicationFactorException;
2930
import com.alibaba.fluss.exception.InvalidTableException;
3031
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
3132
import com.alibaba.fluss.exception.NonPrimaryKeyTableException;
3233
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
3334
import com.alibaba.fluss.exception.PartitionNotExistException;
34-
import com.alibaba.fluss.exception.PartitionSpecInvalidException;
3535
import com.alibaba.fluss.exception.SchemaNotExistException;
3636
import com.alibaba.fluss.exception.TableAlreadyExistException;
3737
import com.alibaba.fluss.exception.TableNotExistException;
@@ -243,6 +243,50 @@ CompletableFuture<Void> createTable(
243243
*/
244244
CompletableFuture<List<PartitionInfo>> listPartitionInfos(TablePath tablePath);
245245

246+
/**
247+
* Create a new partition for a partitioned table.
248+
*
249+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
250+
*
251+
* <ul>
252+
* <li>{@link TableNotExistException} if the table does not exist.
253+
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
254+
* <li>{@link PartitionAlreadyExistsException} if the partition already exists and {@code
255+
* ignoreIfExists} is false.
256+
* <li>{@link InvalidPartitionException} if the input partition spec is invalid.
257+
* </ul>
258+
*
259+
* @param tablePath The table path of the table.
260+
* @param partitionSpec The partition spec to add.
261+
* @param ignoreIfExists Flag to specify behavior when a partition with the given name already
262+
* exists: if set to false, throw a PartitionAlreadyExistsException, if set to true, do
263+
* nothing.
264+
*/
265+
CompletableFuture<Void> createPartition(
266+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists);
267+
268+
/**
269+
* Drop a partition from a partitioned table.
270+
*
271+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
272+
*
273+
* <ul>
274+
* <li>{@link TableNotExistException} if the table does not exist.
275+
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
276+
* <li>{@link PartitionNotExistException} if the partition not exists and {@code
277+
* ignoreIfExists} is false.
278+
* <li>{@link InvalidPartitionException} if the input partition spec is invalid.
279+
* </ul>
280+
*
281+
* @param tablePath The table path of the table.
282+
* @param partitionSpec The partition spec to drop.
283+
* @param ignoreIfNotExists Flag to specify behavior when a partition with the given name does
284+
* not exist: if set to false, throw a PartitionNotExistException, if set to true, do
285+
* nothing.
286+
*/
287+
CompletableFuture<Void> dropPartition(
288+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists);
289+
246290
/**
247291
* Get the latest kv snapshots of the given table asynchronously. A kv snapshot is a snapshot of
248292
* a bucket of a primary key table at a certain point in time. Therefore, there are at-most
@@ -331,48 +375,4 @@ ListOffsetsResult listOffsets(
331375

332376
/** Describe the lake used for lakehouse storage. */
333377
CompletableFuture<LakeStorageInfo> describeLakeStorage();
334-
335-
/**
336-
* Add a partition to a partitioned table.
337-
*
338-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
339-
*
340-
* <ul>
341-
* <li>{@link TableNotExistException} if the table does not exist.
342-
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
343-
* <li>{@link PartitionAlreadyExistsException} if the partition already exists and {@code
344-
* ignoreIfExists} is false.
345-
* <li>{@link PartitionSpecInvalidException} if the input partition spec is invalid.
346-
* </ul>
347-
*
348-
* @param tablePath The table path of the table.
349-
* @param partitionSpec The partition spec to add.
350-
* @param ignoreIfExists Flag to specify behavior when a partition with the given name already
351-
* exists: if set to false, throw a PartitionAlreadyExistsException, if set to true, do
352-
* nothing.
353-
*/
354-
CompletableFuture<Void> addPartition(
355-
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists);
356-
357-
/**
358-
* Drop a partition from a partitioned table.
359-
*
360-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
361-
*
362-
* <ul>
363-
* <li>{@link TableNotExistException} if the table does not exist.
364-
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
365-
* <li>{@link PartitionNotExistException} if the partition not exists and {@code
366-
* ignoreIfExists} is false.
367-
* <li>{@link PartitionSpecInvalidException} if the input partition spec is invalid.
368-
* </ul>
369-
*
370-
* @param tablePath The table path of the table.
371-
* @param partitionSpec The partition spec to drop.
372-
* @param ignoreIfNotExists Flag to specify behavior when a partition with the given name does
373-
* not exist: if set to false, throw a PartitionNotExistException, if set to true, do
374-
* nothing.
375-
*/
376-
CompletableFuture<Void> dropPartition(
377-
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists);
378378
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
import java.util.Map;
7575
import java.util.concurrent.CompletableFuture;
7676

77-
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeAddPartitionRequest;
77+
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
7878
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
7979
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
8080
import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
@@ -270,6 +270,22 @@ public CompletableFuture<List<PartitionInfo>> listPartitionInfos(TablePath table
270270
.thenApply(ClientRpcMessageUtils::toPartitionInfos);
271271
}
272272

273+
@Override
274+
public CompletableFuture<Void> createPartition(
275+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) {
276+
return gateway.createPartition(
277+
makeCreatePartitionRequest(tablePath, partitionSpec, ignoreIfExists))
278+
.thenApply(r -> null);
279+
}
280+
281+
@Override
282+
public CompletableFuture<Void> dropPartition(
283+
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
284+
return gateway.dropPartition(
285+
makeDropPartitionRequest(tablePath, partitionSpec, ignoreIfNotExists))
286+
.thenApply(r -> null);
287+
}
288+
273289
@Override
274290
public CompletableFuture<KvSnapshots> getLatestKvSnapshots(TablePath tablePath) {
275291
GetLatestKvSnapshotsRequest request = new GetLatestKvSnapshotsRequest();
@@ -350,22 +366,6 @@ public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
350366
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
351367
}
352368

353-
@Override
354-
public CompletableFuture<Void> addPartition(
355-
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) {
356-
return gateway.addPartition(
357-
makeAddPartitionRequest(tablePath, partitionSpec, ignoreIfExists))
358-
.thenApply(r -> null);
359-
}
360-
361-
@Override
362-
public CompletableFuture<Void> dropPartition(
363-
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
364-
return gateway.dropPartition(
365-
makeDropPartitionRequest(tablePath, partitionSpec, ignoreIfNotExists))
366-
.thenApply(r -> null);
367-
}
368-
369369
@Override
370370
public void close() {
371371
// nothing to do yet

fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,13 @@
2323
import java.util.Collections;
2424
import java.util.List;
2525

26+
import static com.alibaba.fluss.utils.PartitionUtils.getPartitionName;
2627
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
2728
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
28-
import static com.alibaba.fluss.utils.PartitionUtils.getPartitionName;
2929

3030
/** A getter to get partition name from a row. */
3131
public class PartitionGetter {
3232

33-
// TODO currently, only support one partition key.
34-
private final String partitionKeyName;
3533
private final InternalRow.FieldGetter partitionFieldGetter;
3634

3735
public PartitionGetter(RowType rowType, List<String> partitionKeys) {
@@ -44,12 +42,13 @@ public PartitionGetter(RowType rowType, List<String> partitionKeys) {
4442

4543
// check the partition column
4644
List<String> fieldNames = rowType.getFieldNames();
47-
this.partitionKeyName = partitionKeys.get(0);
48-
int partitionColumnIndex = fieldNames.indexOf(partitionKeyName);
45+
// TODO currently, only support one partition key.
46+
String partitionKey = partitionKeys.get(0);
47+
int partitionColumnIndex = fieldNames.indexOf(partitionKey);
4948
checkArgument(
5049
partitionColumnIndex >= 0,
5150
"The partition column %s is not in the row %s.",
52-
partitionKeyName,
51+
partitionKey,
5352
rowType);
5453

5554
// check the data type of the partition column
@@ -61,9 +60,6 @@ public PartitionGetter(RowType rowType, List<String> partitionKeys) {
6160
public String getPartition(InternalRow row) {
6261
Object partitionValue = partitionFieldGetter.getFieldOrNull(row);
6362
checkNotNull(partitionValue, "Partition value shouldn't be null.");
64-
return getPartitionName(
65-
Collections.singletonList(partitionKeyName),
66-
Collections.singletonList(partitionValue.toString()),
67-
false);
63+
return getPartitionName(Collections.singletonList(partitionValue.toString()));
6864
}
6965
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
3939
import com.alibaba.fluss.remote.RemoteLogSegment;
4040
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
41-
import com.alibaba.fluss.rpc.messages.AddPartitionRequest;
41+
import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
4242
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
4343
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
4444
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
@@ -380,23 +380,23 @@ public static ListOffsetsRequest makeListOffsetsRequest(
380380
return listOffsetsRequest;
381381
}
382382

383-
public static AddPartitionRequest makeAddPartitionRequest(
383+
public static CreatePartitionRequest makeCreatePartitionRequest(
384384
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfNotExists) {
385-
AddPartitionRequest addPartitionRequest =
386-
new AddPartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists);
387-
addPartitionRequest
385+
CreatePartitionRequest createPartitionRequest =
386+
new CreatePartitionRequest().setIgnoreIfNotExists(ignoreIfNotExists);
387+
createPartitionRequest
388388
.setTablePath()
389389
.setDatabaseName(tablePath.getDatabaseName())
390390
.setTableName(tablePath.getTableName());
391+
List<PbKeyValue> pbPartitionKeyAndValues = new ArrayList<>();
391392
partitionSpec
392393
.getPartitionSpec()
393394
.forEach(
394395
(partitionKey, value) ->
395-
addPartitionRequest
396-
.addPartitionSpec()
397-
.setPartitionKey(partitionKey)
398-
.setValue(value));
399-
return addPartitionRequest;
396+
pbPartitionKeyAndValues.add(
397+
new PbKeyValue().setKey(partitionKey).setValue(value)));
398+
createPartitionRequest.setPartitionSpec().addAllPartitionKeyValues(pbPartitionKeyAndValues);
399+
return createPartitionRequest;
400400
}
401401

402402
public static DropPartitionRequest makeDropPartitionRequest(
@@ -407,14 +407,14 @@ public static DropPartitionRequest makeDropPartitionRequest(
407407
.setTablePath()
408408
.setDatabaseName(tablePath.getDatabaseName())
409409
.setTableName(tablePath.getTableName());
410+
List<PbKeyValue> pbPartitionKeyAndValues = new ArrayList<>();
410411
partitionSpec
411412
.getPartitionSpec()
412413
.forEach(
413414
(partitionKey, value) ->
414-
dropPartitionRequest
415-
.addPartitionSpec()
416-
.setPartitionKey(partitionKey)
417-
.setValue(value));
415+
pbPartitionKeyAndValues.add(
416+
new PbKeyValue().setKey(partitionKey).setValue(value)));
417+
dropPartitionRequest.setPartitionSpec().addAllPartitionKeyValues(pbPartitionKeyAndValues);
418418
return dropPartitionRequest;
419419
}
420420

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.alibaba.fluss.config.MemorySize;
3232
import com.alibaba.fluss.metadata.DataLakeFormat;
3333
import com.alibaba.fluss.metadata.DatabaseDescriptor;
34+
import com.alibaba.fluss.metadata.PartitionSpec;
3435
import com.alibaba.fluss.metadata.PhysicalTablePath;
3536
import com.alibaba.fluss.metadata.Schema;
3637
import com.alibaba.fluss.metadata.TableBucket;
@@ -49,6 +50,7 @@
4950

5051
import java.time.Duration;
5152
import java.util.ArrayList;
53+
import java.util.Collections;
5254
import java.util.HashMap;
5355
import java.util.List;
5456
import java.util.Map;
@@ -258,4 +260,8 @@ protected static InternalRow lookupRow(Lookuper lookuper, InternalRow keyRow) th
258260
// lookup this key.
259261
return lookuper.lookup(keyRow).get().getSingletonRow();
260262
}
263+
264+
protected static PartitionSpec newPartitionSpec(String partitionKey, String partitionValue) {
265+
return new PartitionSpec(Collections.singletonMap(partitionKey, partitionValue));
266+
}
261267
}

0 commit comments

Comments
 (0)