Skip to content

Commit a74cdc1

Browse files
authored
[server] Support list partititions by partition spec (#897)
1 parent 04a32d2 commit a74cdc1

File tree

14 files changed

+409
-71
lines changed

14 files changed

+409
-71
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,24 @@ CompletableFuture<Void> createTable(
247247
*/
248248
CompletableFuture<List<PartitionInfo>> listPartitionInfos(TablePath tablePath);
249249

250+
/**
251+
* List all partitions in fluss cluster that are under the given table and the given partial
252+
* PartitionSpec asynchronously.
253+
*
254+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
255+
*
256+
* <ul>
257+
* <li>{@link TableNotExistException} if the table does not exist.
258+
* <li>{@link TableNotPartitionedException} if the table is not partitioned.
259+
* <li>{@link InvalidPartitionException} if the input partition spec is invalid.
260+
* </ul>
261+
*
262+
* @param tablePath The path of the table.
263+
* @param partialPartitionSpec Part of table partition spec
264+
*/
265+
CompletableFuture<List<PartitionInfo>> listPartitionInfos(
266+
TablePath tablePath, PartitionSpec partialPartitionSpec);
267+
250268
/**
251269
* Create a new partition for a partitioned table.
252270
*

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import com.alibaba.fluss.rpc.messages.ListTablesRequest;
6161
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
6262
import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
63+
import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
64+
import com.alibaba.fluss.rpc.messages.PbTablePath;
6365
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
6466
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
6567
import com.alibaba.fluss.rpc.protocol.ApiError;
@@ -80,6 +82,7 @@
8082
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
8183
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
8284
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
85+
import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
8386
import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
8487
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
8588
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters;
@@ -269,10 +272,22 @@ public CompletableFuture<List<String>> listTables(String databaseName) {
269272

270273
@Override
271274
public CompletableFuture<List<PartitionInfo>> listPartitionInfos(TablePath tablePath) {
275+
return listPartitionInfos(tablePath, null);
276+
}
277+
278+
@Override
279+
public CompletableFuture<List<PartitionInfo>> listPartitionInfos(
280+
TablePath tablePath, PartitionSpec partitionSpec) {
272281
ListPartitionInfosRequest request = new ListPartitionInfosRequest();
273-
request.setTablePath()
274-
.setDatabaseName(tablePath.getDatabaseName())
275-
.setTableName(tablePath.getTableName());
282+
request.setTablePath(
283+
new PbTablePath()
284+
.setDatabaseName(tablePath.getDatabaseName())
285+
.setTableName(tablePath.getTableName()));
286+
287+
if (partitionSpec != null) {
288+
PbPartitionSpec pbPartitionSpec = makePbPartitionSpec(partitionSpec);
289+
request.setPartialPartitionSpec(pbPartitionSpec);
290+
}
276291
return gateway.listPartitionInfos(request)
277292
.thenApply(ClientRpcMessageUtils::toPartitionInfos);
278293
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.alibaba.fluss.metadata.PartitionInfo;
3131
import com.alibaba.fluss.metadata.PartitionSpec;
3232
import com.alibaba.fluss.metadata.PhysicalTablePath;
33-
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
3433
import com.alibaba.fluss.metadata.TableBucket;
3534
import com.alibaba.fluss.metadata.TablePath;
3635
import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
@@ -67,6 +66,7 @@
6766
import java.util.Set;
6867
import java.util.stream.Collectors;
6968

69+
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
7070
import static com.alibaba.fluss.utils.Preconditions.checkState;
7171

7272
/**
@@ -300,14 +300,8 @@ public static CreatePartitionRequest makeCreatePartitionRequest(
300300
.setTablePath()
301301
.setDatabaseName(tablePath.getDatabaseName())
302302
.setTableName(tablePath.getTableName());
303-
List<PbKeyValue> pbPartitionKeyAndValues = new ArrayList<>();
304-
partitionSpec
305-
.getSpecMap()
306-
.forEach(
307-
(partitionKey, value) ->
308-
pbPartitionKeyAndValues.add(
309-
new PbKeyValue().setKey(partitionKey).setValue(value)));
310-
createPartitionRequest.setPartitionSpec().addAllPartitionKeyValues(pbPartitionKeyAndValues);
303+
PbPartitionSpec pbPartitionSpec = makePbPartitionSpec(partitionSpec);
304+
createPartitionRequest.setPartitionSpec(pbPartitionSpec);
311305
return createPartitionRequest;
312306
}
313307

@@ -319,14 +313,8 @@ public static DropPartitionRequest makeDropPartitionRequest(
319313
.setTablePath()
320314
.setDatabaseName(tablePath.getDatabaseName())
321315
.setTableName(tablePath.getTableName());
322-
List<PbKeyValue> pbPartitionKeyAndValues = new ArrayList<>();
323-
partitionSpec
324-
.getSpecMap()
325-
.forEach(
326-
(partitionKey, value) ->
327-
pbPartitionKeyAndValues.add(
328-
new PbKeyValue().setKey(partitionKey).setValue(value)));
329-
dropPartitionRequest.setPartitionSpec().addAllPartitionKeyValues(pbPartitionKeyAndValues);
316+
PbPartitionSpec pbPartitionSpec = makePbPartitionSpec(partitionSpec);
317+
dropPartitionRequest.setPartitionSpec(pbPartitionSpec);
330318
return dropPartitionRequest;
331319
}
332320

@@ -348,13 +336,11 @@ public static Map<String, String> toKeyValueMap(List<PbKeyValue> pbKeyValues) {
348336
PbKeyValue::getKey, PbKeyValue::getValue));
349337
}
350338

351-
public static ResolvedPartitionSpec toResolvedPartitionSpec(PbPartitionSpec pbPartitionSpec) {
352-
List<String> partitionKeys = new ArrayList<>();
353-
List<String> partitionValues = new ArrayList<>();
354-
for (PbKeyValue pbKeyValue : pbPartitionSpec.getPartitionKeyValuesList()) {
355-
partitionKeys.add(pbKeyValue.getKey());
356-
partitionValues.add(pbKeyValue.getValue());
357-
}
358-
return new ResolvedPartitionSpec(partitionKeys, partitionValues);
339+
public static PbPartitionSpec makePbPartitionSpec(PartitionSpec partitionSpec) {
340+
Map<String, String> partitionSpecMap = partitionSpec.getSpecMap();
341+
List<PbKeyValue> pbKeyValues = new ArrayList<>(partitionSpecMap.size());
342+
partitionSpecMap.forEach(
343+
(key, value) -> pbKeyValues.add(new PbKeyValue().setKey(key).setValue(value)));
344+
return new PbPartitionSpec().addAllPartitionKeyValues(pbKeyValues);
359345
}
360346
}

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,12 @@
5454
import java.util.List;
5555
import java.util.Map;
5656
import java.util.concurrent.ExecutionException;
57+
import java.util.stream.Collectors;
58+
import java.util.stream.IntStream;
5759

5860
import static com.alibaba.fluss.testutils.DataTestUtils.row;
5961
import static com.alibaba.fluss.testutils.InternalRowAssert.assertThatRow;
62+
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
6063
import static org.assertj.core.api.Assertions.assertThat;
6164

6265
/**
@@ -273,4 +276,14 @@ protected static InternalRow lookupRow(Lookuper lookuper, InternalRow keyRow) th
273276
protected static PartitionSpec newPartitionSpec(String partitionKey, String partitionValue) {
274277
return new PartitionSpec(Collections.singletonMap(partitionKey, partitionValue));
275278
}
279+
280+
protected static PartitionSpec newPartitionSpec(
281+
List<String> partitionKeys, List<String> partitionValues) {
282+
checkArgument(partitionKeys.size() == partitionValues.size());
283+
Map<String, String> collectMap =
284+
IntStream.range(0, partitionKeys.size())
285+
.boxed()
286+
.collect(Collectors.toMap(partitionKeys::get, partitionValues::get));
287+
return new PartitionSpec(collectMap);
288+
}
276289
}

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
3030
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
3131
import com.alibaba.fluss.exception.DatabaseNotExistException;
32+
import com.alibaba.fluss.exception.FlussRuntimeException;
3233
import com.alibaba.fluss.exception.InvalidConfigException;
3334
import com.alibaba.fluss.exception.InvalidDatabaseException;
3435
import com.alibaba.fluss.exception.InvalidPartitionException;
@@ -49,6 +50,7 @@
4950
import com.alibaba.fluss.metadata.KvFormat;
5051
import com.alibaba.fluss.metadata.LogFormat;
5152
import com.alibaba.fluss.metadata.PartitionInfo;
53+
import com.alibaba.fluss.metadata.PartitionSpec;
5254
import com.alibaba.fluss.metadata.Schema;
5355
import com.alibaba.fluss.metadata.SchemaInfo;
5456
import com.alibaba.fluss.metadata.TableBucket;
@@ -504,6 +506,78 @@ void testListPartitionInfos() throws Exception {
504506
}
505507
}
506508

509+
@Test
510+
void testListPartitionInfosByPartitionSpec() throws Exception {
511+
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
512+
513+
TableDescriptor partitionedTable =
514+
TableDescriptor.builder()
515+
.schema(
516+
Schema.newBuilder()
517+
.column("id", DataTypes.STRING())
518+
.column("name", DataTypes.STRING())
519+
.column("pt", DataTypes.STRING())
520+
.column("secondary_partition", DataTypes.STRING())
521+
.build())
522+
.comment("test table")
523+
.distributedBy(3, "id")
524+
.partitionedBy("pt", "secondary_partition")
525+
.build();
526+
TablePath partitionedTablePath = TablePath.of(dbName, "test_partitioned_table");
527+
// create table
528+
admin.createTable(partitionedTablePath, partitionedTable, true).get();
529+
// add three partitions.
530+
admin.createPartition(
531+
partitionedTablePath,
532+
newPartitionSpec(
533+
Arrays.asList("pt", "secondary_partition"),
534+
Arrays.asList("2025", "10")),
535+
false)
536+
.get();
537+
admin.createPartition(
538+
partitionedTablePath,
539+
newPartitionSpec(
540+
Arrays.asList("pt", "secondary_partition"),
541+
Arrays.asList("2025", "11")),
542+
false)
543+
.get();
544+
admin.createPartition(
545+
partitionedTablePath,
546+
newPartitionSpec(
547+
Arrays.asList("pt", "secondary_partition"),
548+
Arrays.asList("2026", "12")),
549+
false)
550+
.get();
551+
552+
// run listPartitionInfos by partition spec with valid partition name.
553+
PartitionSpec partitionSpec = newPartitionSpec("pt", "2025");
554+
555+
List<PartitionInfo> partitionInfos =
556+
admin.listPartitionInfos(partitionedTablePath, partitionSpec).get();
557+
558+
List<String> actualPartitionNames =
559+
partitionInfos.stream()
560+
.map(PartitionInfo::getPartitionName)
561+
.collect(Collectors.toList());
562+
assertThat(actualPartitionNames).containsExactlyInAnyOrder("2025$10", "2025$11");
563+
564+
// run listPartitionInfos by partition spec with invalid partition name.
565+
PartitionSpec invalidNamePartitionSpec = newPartitionSpec("pt", "2024");
566+
List<PartitionInfo> invalidNamePartitionInfos =
567+
admin.listPartitionInfos(partitionedTablePath, invalidNamePartitionSpec).get();
568+
assertThat(invalidNamePartitionInfos).hasSize(0);
569+
570+
// run listPartitionInfos by invalid partition spec.
571+
PartitionSpec invalidPartitionSpec = newPartitionSpec("pt1", "2025");
572+
assertThatThrownBy(
573+
() ->
574+
admin.listPartitionInfos(partitionedTablePath, invalidPartitionSpec)
575+
.get())
576+
.cause()
577+
.isInstanceOf(FlussRuntimeException.class)
578+
.hasMessageContaining("table don't contains this partitionKey: pt1");
579+
}
580+
507581
@Test
508582
void testGetKvSnapshot() throws Exception {
509583
TablePath tablePath1 =

fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.alibaba.fluss.metadata;
1818

1919
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.exception.InvalidPartitionException;
2021

2122
import java.util.ArrayList;
2223
import java.util.Arrays;
@@ -151,4 +152,27 @@ private static List<String> getReorderedPartitionValues(
151152
partitionKey -> reOrderedPartitionValues.add(partitionSpecMap.get(partitionKey)));
152153
return reOrderedPartitionValues;
153154
}
155+
156+
public boolean contains(ResolvedPartitionSpec other) {
157+
List<String> otherPartitionKeys = other.getPartitionKeys();
158+
List<String> otherPartitionValues = other.getPartitionValues();
159+
160+
List<String> expectedPartitionValues = new ArrayList<>();
161+
for (String otherPartitionKey : otherPartitionKeys) {
162+
if (!partitionKeys.contains(otherPartitionKey)) {
163+
throw new InvalidPartitionException(
164+
String.format(
165+
"table don't contains this partitionKey: %s", otherPartitionKey));
166+
}
167+
int keyIndex = partitionKeys.indexOf(otherPartitionKey);
168+
expectedPartitionValues.add(partitionValues.get(keyIndex));
169+
}
170+
171+
String expectedPartitionName =
172+
String.join(PARTITION_SPEC_SEPARATOR, expectedPartitionValues);
173+
174+
String otherPartitionName = String.join(PARTITION_SPEC_SEPARATOR, otherPartitionValues);
175+
176+
return expectedPartitionName.equals(otherPartitionName);
177+
}
154178
}

0 commit comments

Comments
 (0)