Skip to content

Commit b2877ca

Browse files
committed
[core] Introduce Catalog.listPartitionsByNames
1 parent e102abd commit b2877ca

File tree

12 files changed

+322
-2
lines changed

12 files changed

+322
-2
lines changed

docs/static/rest-catalog-open-api.yaml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,48 @@ paths:
983983
$ref: '#/components/responses/TableNotExistErrorResponse'
984984
"500":
985985
$ref: '#/components/responses/ServerErrorResponse'
986+
/v1/{prefix}/databases/{database}/tables/{table}/partitions/list-by-names:
987+
post:
988+
tags:
989+
- partition
990+
summary: List partitions by names
991+
operationId: listPartitionsByNames
992+
parameters:
993+
- name: prefix
994+
in: path
995+
required: true
996+
schema:
997+
type: string
998+
- name: database
999+
in: path
1000+
required: true
1001+
schema:
1002+
type: string
1003+
- name: table
1004+
in: path
1005+
required: true
1006+
schema:
1007+
type: string
1008+
requestBody:
1009+
content:
1010+
application/json:
1011+
schema:
1012+
$ref: '#/components/schemas/ListPartitionsByNamesRequest'
1013+
responses:
1014+
"200":
1015+
description: OK
1016+
content:
1017+
application/json:
1018+
schema:
1019+
$ref: '#/components/schemas/ListPartitionsResponse'
1020+
"400":
1021+
$ref: '#/components/responses/BadRequestErrorResponse'
1022+
"401":
1023+
$ref: '#/components/responses/UnauthorizedErrorResponse'
1024+
"404":
1025+
$ref: '#/components/responses/TableNotExistErrorResponse'
1026+
"500":
1027+
$ref: '#/components/responses/ServerErrorResponse'
9861028
/v1/{prefix}/databases/{database}/tables/{table}/branches:
9871029
get:
9881030
tags:
@@ -2170,6 +2212,16 @@ components:
21702212
type: array
21712213
items:
21722214
type: object
2215+
ListPartitionsByNamesRequest:
2216+
type: object
2217+
properties:
2218+
specs:
2219+
type: array
2220+
description: List of partition specs to query
2221+
items:
2222+
type: object
2223+
additionalProperties:
2224+
type: string
21732225
CreateDatabaseResponse:
21742226
type: object
21752227
properties:

paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.paimon.rest.requests.CreateTagRequest;
4646
import org.apache.paimon.rest.requests.CreateViewRequest;
4747
import org.apache.paimon.rest.requests.ForwardBranchRequest;
48+
import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest;
4849
import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
4950
import org.apache.paimon.rest.requests.RegisterTableRequest;
5051
import org.apache.paimon.rest.requests.RenameTableRequest;
@@ -106,6 +107,7 @@
106107
import static org.apache.paimon.rest.RESTFunctionValidator.isValidFunctionName;
107108
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
108109
import static org.apache.paimon.rest.auth.AuthProviderFactory.createAuthProvider;
110+
import static org.apache.paimon.utils.Preconditions.checkArgument;
109111

110112
/**
111113
* REST API for REST Catalog.
@@ -822,6 +824,38 @@ public PagedList<Partition> listPartitionsPaged(
822824
return new PagedList<>(partitions, response.getNextPageToken());
823825
}
824826

827+
/**
828+
* List partitions by partition names for table.
829+
*
830+
* @param identifier database name and table name.
831+
* @param partitionSpecs partition specs to be queried
832+
* @return a list of partitions matching the given partition specs
833+
* @throws IllegalArgumentException if the number of partition specs exceeds 1000
834+
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists
835+
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
836+
* this table
837+
*/
838+
public List<Partition> listPartitionsByNames(
839+
Identifier identifier, List<Map<String, String>> partitionSpecs) {
840+
checkArgument(
841+
partitionSpecs.size() <= 1000,
842+
"The number of partition specs must not exceed 1000, but got %s",
843+
partitionSpecs.size());
844+
ListPartitionsByNamesRequest request = new ListPartitionsByNamesRequest(partitionSpecs);
845+
ListPartitionsResponse response =
846+
client.post(
847+
resourcePaths.listPartitionsByNames(
848+
identifier.getDatabaseName(), identifier.getObjectName()),
849+
request,
850+
ListPartitionsResponse.class,
851+
restAuthFunction);
852+
List<Partition> partitions = response.getPartitions();
853+
if (partitions == null) {
854+
return emptyList();
855+
}
856+
return partitions;
857+
}
858+
825859
/**
826860
* Create branch for table.
827861
*

paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,18 @@ public String markDonePartitions(String databaseName, String objectName) {
202202
"mark");
203203
}
204204

205+
public String listPartitionsByNames(String databaseName, String objectName) {
206+
return SLASH.join(
207+
V1,
208+
prefix,
209+
DATABASES,
210+
encodeString(databaseName),
211+
TABLES,
212+
encodeString(objectName),
213+
PARTITIONS,
214+
"list-by-names");
215+
}
216+
205217
public String branches(String databaseName, String objectName) {
206218
return SLASH.join(
207219
V1,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest.requests;
20+
21+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
22+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
23+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/** Request for listing partitions by names. */
29+
@JsonIgnoreProperties(ignoreUnknown = true)
30+
public class ListPartitionsByNamesRequest extends BasePartitionsRequest {
31+
32+
@JsonCreator
33+
public ListPartitionsByNamesRequest(
34+
@JsonProperty(FIELD_PARTITION_SPECS) List<Map<String, String>> partitionSpecs) {
35+
super(partitionSpecs);
36+
}
37+
}

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ public PagedList<Partition> listPartitionsPaged(
193193
return new PagedList<>(listPartitions(identifier), null);
194194
}
195195

196+
@Override
197+
public List<Partition> listPartitionsByNames(
198+
Identifier identifier, List<Map<String, String>> partitions)
199+
throws TableNotExistException {
200+
return CatalogUtils.listPartitionsFromFileSystem(getTable(identifier), partitions);
201+
}
202+
196203
protected abstract void createDatabaseImpl(String name, Map<String, String> properties);
197204

198205
@Override

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,19 @@ PagedList<Partition> listPartitionsPaged(
403403
@Nullable String partitionNamePattern)
404404
throws TableNotExistException;
405405

406+
/**
407+
* Get Partition list by partition names of the table.
408+
*
409+
* @param identifier path of the table to list partitions
410+
* @param partitions partition names to be queried
411+
* @return a list of the partitions matching the given partition names
412+
* @throws IllegalArgumentException if the number of partition specs exceeds 1000
413+
* @throws TableNotExistException if the table does not exist
414+
*/
415+
List<Partition> listPartitionsByNames(
416+
Identifier identifier, List<Map<String, String>> partitions)
417+
throws TableNotExistException;
418+
406419
// ======================= view methods ===============================
407420

408421
/**

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ public static void validateTableType(Catalog catalog, String tableType) {
192192
}
193193

194194
public static List<Partition> listPartitionsFromFileSystem(Table table) {
195+
return listPartitionsFromFileSystem(table, null);
196+
}
197+
198+
public static List<Partition> listPartitionsFromFileSystem(
199+
Table table, @Nullable List<Map<String, String>> partitionSpecs) {
195200
Options options = Options.fromMap(table.options());
196201
InternalRowPartitionComputer computer =
197202
new InternalRowPartitionComputer(
@@ -206,8 +211,11 @@ public static List<Partition> listPartitionsFromFileSystem(Table table) {
206211
// https://github.com/apache/paimon/pull/6531 for details
207212
List<PartitionEntry> partitionEntries;
208213
if (scan instanceof InnerTableScan) {
209-
partitionEntries =
210-
((InnerTableScan) scan).withLevelFilter(level -> true).listPartitionEntries();
214+
InnerTableScan innerTableScan = (InnerTableScan) scan;
215+
if (partitionSpecs != null && !partitionSpecs.isEmpty()) {
216+
innerTableScan = innerTableScan.withPartitionsFilter(partitionSpecs);
217+
}
218+
partitionEntries = innerTableScan.withLevelFilter(level -> true).listPartitionEntries();
211219
} else {
212220
partitionEntries = scan.listPartitionEntries();
213221
}

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,13 @@ public PagedList<Partition> listPartitionsPaged(
423423
return wrapped.listPartitionsPaged(identifier, maxResults, pageToken, partitionNamePattern);
424424
}
425425

426+
@Override
427+
public List<Partition> listPartitionsByNames(
428+
Identifier identifier, List<Map<String, String>> partitions)
429+
throws TableNotExistException {
430+
return wrapped.listPartitionsByNames(identifier, partitions);
431+
}
432+
426433
@Override
427434
public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List<String> select)
428435
throws TableNotExistException {

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,28 @@ public PagedList<Partition> listPartitionsPaged(
646646
}
647647
}
648648

649+
@Override
650+
public List<Partition> listPartitionsByNames(
651+
Identifier identifier, List<Map<String, String>> partitions)
652+
throws TableNotExistException {
653+
try {
654+
return api.listPartitionsByNames(identifier, partitions);
655+
} catch (NoSuchResourceException e) {
656+
throw new TableNotExistException(identifier);
657+
} catch (ForbiddenException e) {
658+
throw new TableNoPermissionException(identifier, e);
659+
} catch (NotImplementedException e) {
660+
// not a metastore partitioned table, filter from file system partitions
661+
List<Partition> allPartitions = listPartitionsFromFileSystem(getTable(identifier));
662+
return allPartitions.stream()
663+
.filter(
664+
partition ->
665+
partitions.stream()
666+
.anyMatch(spec -> partition.spec().equals(spec)))
667+
.collect(java.util.stream.Collectors.toList());
668+
}
669+
}
670+
649671
@Override
650672
public void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
651673
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException {

paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.util.Random;
7878
import java.util.stream.Collectors;
7979

80+
import static java.util.Collections.singletonMap;
8081
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
8182
import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
8283
import static org.apache.paimon.CoreOptions.TYPE;
@@ -1545,6 +1546,65 @@ public void testListPartitionsPaged() throws Exception {
15451546
() -> catalog.listPartitionsPaged(identifier, null, null, "dt=0101"));
15461547
}
15471548

1549+
@Test
1550+
public void testListPartitionsByNames() throws Exception {
1551+
if (!supportPartitions()) {
1552+
return;
1553+
}
1554+
1555+
String databaseName = "partitions_by_names_db";
1556+
List<Map<String, String>> partitionSpecs =
1557+
Arrays.asList(
1558+
singletonMap("dt", "20250101"),
1559+
singletonMap("dt", "20250102"),
1560+
singletonMap("dt", "20240102"),
1561+
singletonMap("dt", "20260101"));
1562+
1563+
catalog.dropDatabase(databaseName, true, true);
1564+
catalog.createDatabase(databaseName, true);
1565+
Identifier identifier = Identifier.create(databaseName, "table");
1566+
1567+
catalog.createTable(
1568+
identifier,
1569+
Schema.newBuilder()
1570+
.option(METASTORE_PARTITIONED_TABLE.key(), "true")
1571+
.option(METASTORE_TAG_TO_PARTITION.key(), "dt")
1572+
.column("col", DataTypes.INT())
1573+
.column("dt", DataTypes.STRING())
1574+
.partitionKeys("dt")
1575+
.build(),
1576+
true);
1577+
1578+
BatchWriteBuilder writeBuilder = catalog.getTable(identifier).newBatchWriteBuilder();
1579+
try (BatchTableWrite write = writeBuilder.newWrite();
1580+
BatchTableCommit commit = writeBuilder.newCommit()) {
1581+
for (Map<String, String> partitionSpec : partitionSpecs) {
1582+
write.write(GenericRow.of(0, BinaryString.fromString(partitionSpec.get("dt"))));
1583+
}
1584+
commit.commit(write.prepareCommit());
1585+
}
1586+
1587+
// Test listing partitions by names
1588+
List<Map<String, String>> specsToQuery =
1589+
Arrays.asList(singletonMap("dt", "20250101"), singletonMap("dt", "20250102"));
1590+
List<Partition> partitions = catalog.listPartitionsByNames(identifier, specsToQuery);
1591+
1592+
assertThat(partitions.stream().map(Partition::spec).collect(Collectors.toList()))
1593+
.containsExactlyInAnyOrderElementsOf(specsToQuery);
1594+
1595+
// Test with non-existent partition spec
1596+
List<Map<String, String>> nonExistentSpecs =
1597+
Arrays.asList(singletonMap("dt", "20990101"), singletonMap("dt", "20990102"));
1598+
List<Partition> emptyPartitions =
1599+
catalog.listPartitionsByNames(identifier, nonExistentSpecs);
1600+
assertEquals(0, emptyPartitions.size());
1601+
1602+
// Test with empty partition specs
1603+
List<Partition> emptyResult =
1604+
catalog.listPartitionsByNames(identifier, Collections.emptyList());
1605+
assertEquals(0, emptyResult.size());
1606+
}
1607+
15481608
protected boolean supportsAlterDatabase() {
15491609
return false;
15501610
}

0 commit comments

Comments
 (0)