Skip to content

Commit acc88fe

Browse files
committed
[server] Tolerate lake table existent if the schema and properties matches
1 parent 665e4e8 commit acc88fe

File tree

16 files changed

+895
-147
lines changed

16 files changed

+895
-147
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@
3434
@PublicEvolving
3535
public interface LakeCatalog extends AutoCloseable {
3636

37+
/**
38+
* Get a table in lake.
39+
*
40+
* @param tablePath path of the table to be created
41+
* @throws TableNotExistException if the table not exists
42+
*/
43+
TableDescriptor getTable(TablePath tablePath) throws TableNotExistException;
44+
3745
/**
3846
* Create a new table in lake.
3947
*

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader
7474
this.loader = loader;
7575
}
7676

77+
@Override
78+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
79+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
80+
return inner.getTable(tablePath);
81+
}
82+
}
83+
7784
@Override
7885
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7986
throws TableAlreadyExistException {

fluss-common/src/main/java/org/apache/fluss/types/DataType.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public String asSummaryString() {
139139

140140
public abstract <R> R accept(DataTypeVisitor<R> visitor);
141141

142+
public DataType notNull() {
143+
return copy(false);
144+
}
145+
146+
public DataType nullable() {
147+
return copy(true);
148+
}
149+
142150
@Override
143151
public String toString() {
144152
return asSummaryString();

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public LakeSource<?> createLakeSource(TablePath tablePath) {
145145

146146
private static class TestPaimonLakeCatalog implements LakeCatalog {
147147

148+
@Override
149+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
150+
return null;
151+
}
152+
148153
@Override
149154
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
150155
throws TableAlreadyExistException {}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ protected Catalog getIcebergCatalog() {
8181
return icebergCatalog;
8282
}
8383

84+
@Override
85+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
86+
throw new UnsupportedOperationException(
87+
"Get table is not supported for Iceberg at the moment");
88+
}
89+
8490
@Override
8591
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
8692
throws TableAlreadyExistException {

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public LanceLakeCatalog(Configuration config) {
4242
this.options = config;
4343
}
4444

45+
@Override
46+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
47+
throw new UnsupportedOperationException(
48+
"Get table is not supported for Lance at the moment");
49+
}
50+
4551
@Override
4652
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
4753
// currently, we don't support primary key table for lance
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon;
19+
20+
import org.apache.fluss.types.DataType;
21+
import org.apache.fluss.types.DataTypes;
22+
23+
import org.apache.paimon.types.ArrayType;
24+
import org.apache.paimon.types.BigIntType;
25+
import org.apache.paimon.types.BinaryType;
26+
import org.apache.paimon.types.BooleanType;
27+
import org.apache.paimon.types.CharType;
28+
import org.apache.paimon.types.DataField;
29+
import org.apache.paimon.types.DataTypeVisitor;
30+
import org.apache.paimon.types.DateType;
31+
import org.apache.paimon.types.DecimalType;
32+
import org.apache.paimon.types.DoubleType;
33+
import org.apache.paimon.types.FloatType;
34+
import org.apache.paimon.types.IntType;
35+
import org.apache.paimon.types.LocalZonedTimestampType;
36+
import org.apache.paimon.types.MapType;
37+
import org.apache.paimon.types.MultisetType;
38+
import org.apache.paimon.types.RowType;
39+
import org.apache.paimon.types.SmallIntType;
40+
import org.apache.paimon.types.TimeType;
41+
import org.apache.paimon.types.TimestampType;
42+
import org.apache.paimon.types.TinyIntType;
43+
import org.apache.paimon.types.VarBinaryType;
44+
import org.apache.paimon.types.VarCharType;
45+
import org.apache.paimon.types.VariantType;
46+
47+
/** Convert from Paimon's data type to Fluss's data type. */
48+
public class PaimonDataTypeToFlussDataType implements DataTypeVisitor<DataType> {
49+
50+
public static final PaimonDataTypeToFlussDataType INSTANCE =
51+
new PaimonDataTypeToFlussDataType();
52+
53+
@Override
54+
public DataType visit(CharType charType) {
55+
return withNullability(DataTypes.CHAR(charType.getLength()), charType.isNullable());
56+
}
57+
58+
@Override
59+
public DataType visit(VarCharType varCharType) {
60+
return withNullability(DataTypes.STRING(), varCharType.isNullable());
61+
}
62+
63+
@Override
64+
public DataType visit(BooleanType booleanType) {
65+
return withNullability(DataTypes.BOOLEAN(), booleanType.isNullable());
66+
}
67+
68+
@Override
69+
public DataType visit(BinaryType binaryType) {
70+
return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType.isNullable());
71+
}
72+
73+
@Override
74+
public DataType visit(VarBinaryType varBinaryType) {
75+
return withNullability(DataTypes.BYTES(), varBinaryType.isNullable());
76+
}
77+
78+
@Override
79+
public DataType visit(DecimalType decimalType) {
80+
return withNullability(
81+
DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()),
82+
decimalType.isNullable());
83+
}
84+
85+
@Override
86+
public DataType visit(TinyIntType tinyIntType) {
87+
return withNullability(DataTypes.TINYINT(), tinyIntType.isNullable());
88+
}
89+
90+
@Override
91+
public DataType visit(SmallIntType smallIntType) {
92+
return withNullability(DataTypes.SMALLINT(), smallIntType.isNullable());
93+
}
94+
95+
@Override
96+
public DataType visit(IntType intType) {
97+
return withNullability(DataTypes.INT(), intType.isNullable());
98+
}
99+
100+
@Override
101+
public DataType visit(BigIntType bigIntType) {
102+
return withNullability(DataTypes.BIGINT(), bigIntType.isNullable());
103+
}
104+
105+
@Override
106+
public DataType visit(FloatType floatType) {
107+
return withNullability(DataTypes.FLOAT(), floatType.isNullable());
108+
}
109+
110+
@Override
111+
public DataType visit(DoubleType doubleType) {
112+
return withNullability(DataTypes.DOUBLE(), doubleType.isNullable());
113+
}
114+
115+
@Override
116+
public DataType visit(DateType dateType) {
117+
return withNullability(DataTypes.DATE(), dateType.isNullable());
118+
}
119+
120+
@Override
121+
public DataType visit(TimeType timeType) {
122+
return withNullability(DataTypes.TIME(), timeType.isNullable());
123+
}
124+
125+
@Override
126+
public DataType visit(TimestampType timestampType) {
127+
return withNullability(
128+
DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType.isNullable());
129+
}
130+
131+
@Override
132+
public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
133+
return withNullability(
134+
DataTypes.TIMESTAMP_LTZ(localZonedTimestampType.getPrecision()),
135+
localZonedTimestampType.isNullable());
136+
}
137+
138+
@Override
139+
public DataType visit(VariantType variantType) {
140+
throw new UnsupportedOperationException("VariantType is not supported for Fluss.");
141+
}
142+
143+
@Override
144+
public DataType visit(ArrayType arrayType) {
145+
return withNullability(
146+
DataTypes.ARRAY(arrayType.getElementType().accept(this)), arrayType.isNullable());
147+
}
148+
149+
@Override
150+
public DataType visit(MultisetType multisetType) {
151+
throw new UnsupportedOperationException("MultisetType is not supported for Fluss.");
152+
}
153+
154+
@Override
155+
public DataType visit(MapType mapType) {
156+
return withNullability(
157+
DataTypes.MAP(
158+
mapType.getKeyType().accept(this), mapType.getValueType().accept(this)),
159+
mapType.isNullable());
160+
}
161+
162+
@Override
163+
public DataType visit(RowType rowType) {
164+
org.apache.fluss.types.RowType.Builder rowTypeBuilder =
165+
org.apache.fluss.types.RowType.builder();
166+
for (DataField field : rowType.getFields()) {
167+
rowTypeBuilder.field(field.name(), field.type().accept(this), field.description());
168+
}
169+
return withNullability(rowTypeBuilder.build(), rowType.isNullable());
170+
}
171+
172+
private DataType withNullability(DataType fluss, boolean nullable) {
173+
if (fluss.isNullable() != nullable) {
174+
return nullable ? fluss.nullable() : fluss.notNull();
175+
}
176+
return fluss;
177+
}
178+
}

0 commit comments

Comments
 (0)