Skip to content

Commit de7a50d

Browse files
authored
[core] Support partition API (apache#4786)
1 parent 71921c5 commit de7a50d

File tree

14 files changed

+931
-127
lines changed

14 files changed

+931
-127
lines changed

paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static GenericRow convertSpecToInternalRow(
111111
List<String> fieldNames = partType.getFieldNames();
112112
for (Map.Entry<String, String> entry : spec.entrySet()) {
113113
Object value =
114-
defaultPartValue.equals(entry.getValue())
114+
defaultPartValue != null && defaultPartValue.equals(entry.getValue())
115115
? null
116116
: castFromString(
117117
entry.getValue(), partType.getField(entry.getKey()).type());

paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,23 @@ public <T extends RESTResponse> T delete(String path, Map<String, String> header
9494
return exec(request, null);
9595
}
9696

97+
@Override
98+
public <T extends RESTResponse> T delete(
99+
String path, RESTRequest body, Map<String, String> headers) {
100+
try {
101+
RequestBody requestBody = buildRequestBody(body);
102+
Request request =
103+
new Request.Builder()
104+
.url(uri + path)
105+
.delete(requestBody)
106+
.headers(Headers.of(headers))
107+
.build();
108+
return exec(request, null);
109+
} catch (JsonProcessingException e) {
110+
throw new RESTException(e, "build request failed.");
111+
}
112+
}
113+
97114
@Override
98115
public void close() throws IOException {
99116
okHttpClient.dispatcher().cancelAll();

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 124 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import org.apache.paimon.catalog.Database;
2727
import org.apache.paimon.catalog.Identifier;
2828
import org.apache.paimon.catalog.PropertyChange;
29+
import org.apache.paimon.data.GenericRow;
30+
import org.apache.paimon.data.serializer.InternalRowSerializer;
2931
import org.apache.paimon.fs.FileIO;
3032
import org.apache.paimon.fs.Path;
3133
import org.apache.paimon.manifest.PartitionEntry;
34+
import org.apache.paimon.operation.FileStoreCommit;
3235
import org.apache.paimon.operation.Lock;
3336
import org.apache.paimon.options.CatalogOptions;
3437
import org.apache.paimon.options.Options;
@@ -41,15 +44,19 @@
4144
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
4245
import org.apache.paimon.rest.requests.AlterTableRequest;
4346
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
47+
import org.apache.paimon.rest.requests.CreatePartitionRequest;
4448
import org.apache.paimon.rest.requests.CreateTableRequest;
49+
import org.apache.paimon.rest.requests.DropPartitionRequest;
4550
import org.apache.paimon.rest.requests.RenameTableRequest;
4651
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
4752
import org.apache.paimon.rest.responses.ConfigResponse;
4853
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
4954
import org.apache.paimon.rest.responses.GetDatabaseResponse;
5055
import org.apache.paimon.rest.responses.GetTableResponse;
5156
import org.apache.paimon.rest.responses.ListDatabasesResponse;
57+
import org.apache.paimon.rest.responses.ListPartitionsResponse;
5258
import org.apache.paimon.rest.responses.ListTablesResponse;
59+
import org.apache.paimon.rest.responses.PartitionResponse;
5360
import org.apache.paimon.schema.Schema;
5461
import org.apache.paimon.schema.SchemaChange;
5562
import org.apache.paimon.schema.TableSchema;
@@ -58,10 +65,11 @@
5865
import org.apache.paimon.table.FileStoreTableFactory;
5966
import org.apache.paimon.table.Table;
6067
import org.apache.paimon.table.object.ObjectTable;
68+
import org.apache.paimon.table.sink.BatchWriteBuilder;
69+
import org.apache.paimon.types.RowType;
6170
import org.apache.paimon.utils.Pair;
6271
import org.apache.paimon.utils.Preconditions;
6372

64-
import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
6573
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
6674
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
6775

@@ -71,15 +79,20 @@
7179
import java.io.IOException;
7280
import java.time.Duration;
7381
import java.util.ArrayList;
82+
import java.util.Collections;
7483
import java.util.List;
7584
import java.util.Map;
7685
import java.util.Optional;
7786
import java.util.Set;
7887
import java.util.concurrent.ScheduledExecutorService;
88+
import java.util.stream.Collectors;
7989

90+
import static org.apache.paimon.CoreOptions.createCommitUser;
8091
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
92+
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
8193
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
8294
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
95+
import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
8396
import static org.apache.paimon.utils.Preconditions.checkNotNull;
8497
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
8598

@@ -132,7 +145,8 @@ public RESTCatalog(CatalogContext catalogContext) {
132145
Map<String, String> initHeaders =
133146
RESTUtil.merge(
134147
configHeaders(catalogOptions.toMap()), this.catalogAuth.getHeaders());
135-
Options options = new Options(fetchOptionsFromServer(initHeaders, initHeaders));
148+
Options options =
149+
new Options(fetchOptionsFromServer(initHeaders, catalogContext.options().toMap()));
136150
this.context =
137151
CatalogContext.create(
138152
options, catalogContext.preferIO(), catalogContext.fallbackIO());
@@ -141,20 +155,6 @@ public RESTCatalog(CatalogContext catalogContext) {
141155
this.fileIO = getFileIOFromOptions(context);
142156
}
143157

144-
private static FileIO getFileIOFromOptions(CatalogContext context) {
145-
try {
146-
Options options = context.options();
147-
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
148-
Path warehousePath = new Path(warehouseStr);
149-
CatalogContext contextWithNewOptions =
150-
CatalogContext.create(options, context.preferIO(), context.fallbackIO());
151-
return FileIO.get(warehousePath, contextWithNewOptions);
152-
} catch (IOException e) {
153-
LOG.warn("Can not get FileIO from options.");
154-
throw new RuntimeException(e);
155-
}
156-
}
157-
158158
@Override
159159
public String warehouse() {
160160
return context.options().get(CatalogOptions.WAREHOUSE);
@@ -360,17 +360,43 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
360360
@Override
361361
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
362362
throws TableNotExistException {
363-
throw new UnsupportedOperationException();
363+
try {
364+
CreatePartitionRequest request = new CreatePartitionRequest(identifier, partitionSpec);
365+
client.post(
366+
resourcePaths.partitions(
367+
identifier.getDatabaseName(), identifier.getTableName()),
368+
request,
369+
PartitionResponse.class,
370+
headers());
371+
} catch (NoSuchResourceException e) {
372+
throw new TableNotExistException(identifier);
373+
} catch (ForbiddenException e) {
374+
throw new TableNoPermissionException(identifier, e);
375+
}
364376
}
365377

366378
@Override
367379
public void dropPartition(Identifier identifier, Map<String, String> partitions)
368-
throws TableNotExistException, PartitionNotExistException {}
380+
throws TableNotExistException, PartitionNotExistException {
381+
checkNotSystemTable(identifier, "dropPartition");
382+
dropPartitionMetadata(identifier, partitions);
383+
Table table = getTable(identifier);
384+
cleanPartitionsInFileSystem(table, partitions);
385+
}
369386

370387
@Override
371388
public List<PartitionEntry> listPartitions(Identifier identifier)
372389
throws TableNotExistException {
373-
throw new UnsupportedOperationException();
390+
FileStoreTable table = (FileStoreTable) getTable(identifier);
391+
boolean whetherSupportListPartitions =
392+
Boolean.parseBoolean(
393+
table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key()));
394+
if (whetherSupportListPartitions) {
395+
RowType rowType = table.schema().logicalPartitionType();
396+
return listPartitionsFromServer(identifier, rowType);
397+
} else {
398+
return getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
399+
}
374400
}
375401

376402
@Override
@@ -388,16 +414,14 @@ public void close() throws Exception {
388414
}
389415
}
390416

391-
@VisibleForTesting
392-
Map<String, String> fetchOptionsFromServer(
417+
protected Map<String, String> fetchOptionsFromServer(
393418
Map<String, String> headers, Map<String, String> clientProperties) {
394419
ConfigResponse response =
395420
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers);
396421
return response.merge(clientProperties);
397422
}
398423

399-
@VisibleForTesting
400-
Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
424+
private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
401425
Preconditions.checkArgument(identifier.getSystemTableName() == null);
402426
GetTableResponse response = getTableResponse(identifier);
403427
FileStoreTable table =
@@ -420,8 +444,42 @@ Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException
420444
return table;
421445
}
422446

423-
protected GetTableResponse getTableResponse(Identifier identifier)
447+
private List<PartitionEntry> listPartitionsFromServer(Identifier identifier, RowType rowType)
424448
throws TableNotExistException {
449+
try {
450+
ListPartitionsResponse response =
451+
client.get(
452+
resourcePaths.partitions(
453+
identifier.getDatabaseName(), identifier.getTableName()),
454+
ListPartitionsResponse.class,
455+
headers());
456+
if (response != null && response.getPartitions() != null) {
457+
return response.getPartitions().stream()
458+
.map(p -> convertToPartitionEntry(p, rowType))
459+
.collect(Collectors.toList());
460+
} else {
461+
return Collections.emptyList();
462+
}
463+
} catch (NoSuchResourceException e) {
464+
throw new TableNotExistException(identifier);
465+
} catch (ForbiddenException e) {
466+
throw new TableNoPermissionException(identifier, e);
467+
}
468+
}
469+
470+
private void cleanPartitionsInFileSystem(Table table, Map<String, String> partitions) {
471+
FileStoreTable fileStoreTable = (FileStoreTable) table;
472+
try (FileStoreCommit commit =
473+
fileStoreTable
474+
.store()
475+
.newCommit(
476+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
477+
commit.dropPartitions(
478+
Collections.singletonList(partitions), BatchWriteBuilder.COMMIT_IDENTIFIER);
479+
}
480+
}
481+
482+
private GetTableResponse getTableResponse(Identifier identifier) throws TableNotExistException {
425483
try {
426484
return client.get(
427485
resourcePaths.table(identifier.getDatabaseName(), identifier.getTableName()),
@@ -434,6 +492,23 @@ protected GetTableResponse getTableResponse(Identifier identifier)
434492
}
435493
}
436494

495+
private boolean dropPartitionMetadata(Identifier identifier, Map<String, String> partitions)
496+
throws TableNoPermissionException, PartitionNotExistException {
497+
try {
498+
DropPartitionRequest request = new DropPartitionRequest(partitions);
499+
client.delete(
500+
resourcePaths.partitions(
501+
identifier.getDatabaseName(), identifier.getTableName()),
502+
request,
503+
headers());
504+
return true;
505+
} catch (NoSuchResourceException ignore) {
506+
throw new PartitionNotExistException(identifier, partitions);
507+
} catch (ForbiddenException e) {
508+
throw new TableNoPermissionException(identifier, e);
509+
}
510+
}
511+
437512
private static Map<String, String> configHeaders(Map<String, String> properties) {
438513
return RESTUtil.extractPrefixMap(properties, "header.");
439514
}
@@ -464,4 +539,29 @@ private ScheduledExecutorService tokenRefreshExecutor() {
464539

465540
return refreshExecutor;
466541
}
542+
543+
private PartitionEntry convertToPartitionEntry(PartitionResponse partition, RowType rowType) {
544+
InternalRowSerializer serializer = new InternalRowSerializer(rowType);
545+
GenericRow row = convertSpecToInternalRow(partition.getSpec(), rowType, null);
546+
return new PartitionEntry(
547+
serializer.toBinaryRow(row).copy(),
548+
partition.getRecordCount(),
549+
partition.getFileSizeInBytes(),
550+
partition.getFileCount(),
551+
partition.getLastFileCreationTime());
552+
}
553+
554+
private static FileIO getFileIOFromOptions(CatalogContext context) {
555+
try {
556+
Options options = context.options();
557+
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
558+
Path warehousePath = new Path(warehouseStr);
559+
CatalogContext contextWithNewOptions =
560+
CatalogContext.create(options, context.preferIO(), context.fallbackIO());
561+
return FileIO.get(warehousePath, contextWithNewOptions);
562+
} catch (IOException e) {
563+
LOG.warn("Can not get FileIO from options.");
564+
throw new RuntimeException(e);
565+
}
566+
}
467567
}

paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ <T extends RESTResponse> T post(
3030
String path, RESTRequest body, Class<T> responseType, Map<String, String> headers);
3131

3232
<T extends RESTResponse> T delete(String path, Map<String, String> headers);
33+
34+
<T extends RESTResponse> T delete(String path, RESTRequest body, Map<String, String> headers);
3335
}

paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,15 @@ public String renameTable(String databaseName, String tableName) {
8282
.add("rename")
8383
.toString();
8484
}
85+
86+
public String partitions(String databaseName, String tableName) {
87+
return SLASH.add("v1")
88+
.add(prefix)
89+
.add("databases")
90+
.add(databaseName)
91+
.add("tables")
92+
.add(tableName)
93+
.add("partitions")
94+
.toString();
95+
}
8596
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest.requests;
20+
21+
import org.apache.paimon.catalog.Identifier;
22+
import org.apache.paimon.rest.RESTRequest;
23+
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
25+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
26+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
28+
29+
import java.util.Map;
30+
31+
/** Request for creating partition. */
32+
@JsonIgnoreProperties(ignoreUnknown = true)
33+
public class CreatePartitionRequest implements RESTRequest {
34+
35+
private static final String FIELD_IDENTIFIER = "identifier";
36+
private static final String FIELD_PARTITION_SPEC = "spec";
37+
38+
@JsonProperty(FIELD_IDENTIFIER)
39+
private final Identifier identifier;
40+
41+
@JsonProperty(FIELD_PARTITION_SPEC)
42+
private final Map<String, String> partitionSpec;
43+
44+
@JsonCreator
45+
public CreatePartitionRequest(
46+
@JsonProperty(FIELD_IDENTIFIER) Identifier identifier,
47+
@JsonProperty(FIELD_PARTITION_SPEC) Map<String, String> partitionSpec) {
48+
this.identifier = identifier;
49+
this.partitionSpec = partitionSpec;
50+
}
51+
52+
@JsonGetter(FIELD_IDENTIFIER)
53+
public Identifier getIdentifier() {
54+
return identifier;
55+
}
56+
57+
@JsonGetter(FIELD_PARTITION_SPEC)
58+
public Map<String, String> getPartitionSpec() {
59+
return partitionSpec;
60+
}
61+
}

0 commit comments

Comments
 (0)