Skip to content

Commit 54ee2e6

Browse files
committed
[iceberg] Introduce Iceberg Table in Catalog
1 parent 9793945 commit 54ee2e6

File tree

5 files changed

+281
-1
lines changed

5 files changed

+281
-1
lines changed

paimon-api/src/main/java/org/apache/paimon/TableType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public enum TableType implements DescribedEnum {
3434
"A materialized table combines normal Paimon table and materialized SQL."),
3535
OBJECT_TABLE(
3636
"object-table", "An object table combines normal Paimon table and object location."),
37-
LANCE_TABLE("lance-table", "A lance table, see 'https://lancedb.github.io/lance/'.");
37+
LANCE_TABLE("lance-table", "A lance table, see 'https://lancedb.github.io/lance/'."),
38+
ICEBERG_TABLE("iceberg-table", "An iceberg table, see 'https://iceberg.apache.org/'.");
3839

3940
private final String value;
4041
private final String description;

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.table.FileStoreTableFactory;
3434
import org.apache.paimon.table.FormatTable;
3535
import org.apache.paimon.table.Table;
36+
import org.apache.paimon.table.iceberg.IcebergTable;
3637
import org.apache.paimon.table.lance.LanceTable;
3738
import org.apache.paimon.table.object.ObjectTable;
3839
import org.apache.paimon.table.system.AllTableOptionsTable;
@@ -216,6 +217,10 @@ public static Table loadTable(
216217
return toLanceTable(identifier, schema, dataFileIO);
217218
}
218219

220+
if (options.type() == TableType.ICEBERG_TABLE) {
221+
return toIcebergTable(identifier, schema, dataFileIO);
222+
}
223+
219224
CatalogEnvironment catalogEnv =
220225
new CatalogEnvironment(
221226
identifier,
@@ -324,4 +329,19 @@ private static LanceTable toLanceTable(
324329
.comment(schema.comment())
325330
.build();
326331
}
332+
333+
private static IcebergTable toIcebergTable(
334+
Identifier identifier, TableSchema schema, Function<Path, FileIO> fileIO) {
335+
Map<String, String> options = schema.options();
336+
String location = options.get(CoreOptions.PATH.key());
337+
return IcebergTable.builder()
338+
.fileIO(fileIO.apply(new Path(location)))
339+
.identifier(identifier)
340+
.location(location)
341+
.rowType(schema.logicalRowType())
342+
.partitionKeys(schema.partitionKeys())
343+
.options(options)
344+
.comment(schema.comment())
345+
.build();
346+
}
327347
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.iceberg;
20+
21+
import org.apache.paimon.catalog.Identifier;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.table.Table;
24+
import org.apache.paimon.types.RowType;
25+
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
/** An iceberg table, paimon does not support read and write operation on this table yet. */
30+
public interface IcebergTable extends Table {
31+
32+
/** Object location in file system. */
33+
String location();
34+
35+
@Override
36+
IcebergTable copy(Map<String, String> dynamicOptions);
37+
38+
/** Create a new builder for {@link IcebergTable}. */
39+
static Builder builder() {
40+
return new Builder();
41+
}
42+
43+
/** Builder for {@link IcebergTable}. */
44+
class Builder {
45+
46+
private Identifier identifier;
47+
private FileIO fileIO;
48+
private RowType rowType;
49+
private List<String> partitionKeys;
50+
private String location;
51+
private Map<String, String> options;
52+
private String comment;
53+
54+
public Builder identifier(Identifier identifier) {
55+
this.identifier = identifier;
56+
return this;
57+
}
58+
59+
public Builder fileIO(FileIO fileIO) {
60+
this.fileIO = fileIO;
61+
return this;
62+
}
63+
64+
public Builder rowType(RowType rowType) {
65+
this.rowType = rowType;
66+
return this;
67+
}
68+
69+
public Builder partitionKeys(List<String> partitionKeys) {
70+
this.partitionKeys = partitionKeys;
71+
return this;
72+
}
73+
74+
public Builder location(String location) {
75+
this.location = location;
76+
return this;
77+
}
78+
79+
public Builder options(Map<String, String> options) {
80+
this.options = options;
81+
return this;
82+
}
83+
84+
public Builder comment(String comment) {
85+
this.comment = comment;
86+
return this;
87+
}
88+
89+
public IcebergTable build() {
90+
return new IcebergTableImpl(
91+
identifier, fileIO, rowType, partitionKeys, location, options, comment);
92+
}
93+
}
94+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.iceberg;
20+
21+
import org.apache.paimon.catalog.Identifier;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.stats.Statistics;
24+
import org.apache.paimon.table.ReadonlyTable;
25+
import org.apache.paimon.table.source.InnerTableRead;
26+
import org.apache.paimon.table.source.InnerTableScan;
27+
import org.apache.paimon.types.RowType;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.util.Collections;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
37+
/** An implementation for {@link IcebergTable}. */
38+
public class IcebergTableImpl implements ReadonlyTable, IcebergTable {
39+
40+
private final Identifier identifier;
41+
private final FileIO fileIO;
42+
private final RowType rowType;
43+
private final List<String> partitionKeys;
44+
private final String location;
45+
private final Map<String, String> options;
46+
@Nullable private final String comment;
47+
48+
public IcebergTableImpl(
49+
Identifier identifier,
50+
FileIO fileIO,
51+
RowType rowType,
52+
List<String> partitionKeys,
53+
String location,
54+
Map<String, String> options,
55+
@Nullable String comment) {
56+
this.identifier = identifier;
57+
this.fileIO = fileIO;
58+
this.rowType = rowType;
59+
this.partitionKeys = partitionKeys;
60+
this.location = location;
61+
this.options = options;
62+
this.comment = comment;
63+
}
64+
65+
@Override
66+
public String name() {
67+
return identifier.getTableName();
68+
}
69+
70+
@Override
71+
public String fullName() {
72+
return identifier.getFullName();
73+
}
74+
75+
@Override
76+
public RowType rowType() {
77+
return rowType;
78+
}
79+
80+
@Override
81+
public List<String> partitionKeys() {
82+
return partitionKeys;
83+
}
84+
85+
@Override
86+
public List<String> primaryKeys() {
87+
return Collections.emptyList();
88+
}
89+
90+
@Override
91+
public Map<String, String> options() {
92+
return options;
93+
}
94+
95+
@Override
96+
public Optional<String> comment() {
97+
return Optional.ofNullable(comment);
98+
}
99+
100+
@Override
101+
public Optional<Statistics> statistics() {
102+
return ReadonlyTable.super.statistics();
103+
}
104+
105+
@Override
106+
public FileIO fileIO() {
107+
return fileIO;
108+
}
109+
110+
@Override
111+
public String location() {
112+
return location;
113+
}
114+
115+
@Override
116+
public IcebergTable copy(Map<String, String> dynamicOptions) {
117+
Map<String, String> newOptions = new HashMap<>(options);
118+
newOptions.putAll(dynamicOptions);
119+
return new IcebergTableImpl(
120+
identifier, fileIO, rowType, partitionKeys, location, newOptions, comment);
121+
}
122+
123+
@Override
124+
public InnerTableScan newScan() {
125+
throw new UnsupportedOperationException(
126+
"LanceTable does not support InnerTableScan. Use newRead() instead.");
127+
}
128+
129+
@Override
130+
public InnerTableRead newRead() {
131+
throw new UnsupportedOperationException(
132+
"LanceTable does not support InnerTableRead. Use newScan() instead.");
133+
}
134+
}

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2069,6 +2069,37 @@ public void testCreateLanceTable() throws Exception {
20692069
assertThat(tables).containsExactlyInAnyOrder("table1");
20702070
}
20712071

2072+
@Test
2073+
public void testCreateIcebergTable() throws Exception {
2074+
Catalog catalog = newRestCatalogWithDataToken();
2075+
catalog.createDatabase("test_db", false);
2076+
List<String> tables = catalog.listTables("test_db");
2077+
assertThat(tables).isEmpty();
2078+
2079+
Map<String, String> options = new HashMap<>();
2080+
options.put("type", "iceberg-table");
2081+
options.put("a", "b");
2082+
Schema schema =
2083+
new Schema(
2084+
Lists.newArrayList(
2085+
new DataField(0, "pt", DataTypes.INT()),
2086+
new DataField(1, "col1", DataTypes.STRING()),
2087+
new DataField(2, "col2", DataTypes.STRING())),
2088+
Collections.singletonList("pt"),
2089+
Collections.emptyList(),
2090+
options,
2091+
"");
2092+
catalog.createTable(Identifier.create("test_db", "table1"), schema, false);
2093+
2094+
tables = catalog.listTables("test_db");
2095+
Table table = catalog.getTable(Identifier.create("test_db", "table1"));
2096+
assertThat(table.options()).containsEntry("a", "b");
2097+
assertThat(table.options().containsKey("path")).isTrue();
2098+
assertThat(table.partitionKeys()).containsExactly("pt");
2099+
assertThat(table.fileIO()).isInstanceOf(RESTTokenFileIO.class);
2100+
assertThat(tables).containsExactlyInAnyOrder("table1");
2101+
}
2102+
20722103
private TestPagedResponse generateTestPagedResponse(
20732104
Map<String, String> queryParams,
20742105
List<Integer> testData,

0 commit comments

Comments
 (0)