Skip to content

Commit 99bf4d3

Browse files
committed
[server] Tolerate lake table existent if the schema and properties matches
1 parent 665e4e8 commit 99bf4d3

File tree

16 files changed

+906
-141
lines changed

16 files changed

+906
-141
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.apache.fluss.client;
2+
3+
import org.apache.fluss.client.admin.Admin;
4+
import org.apache.fluss.config.ConfigOptions;
5+
import org.apache.fluss.config.Configuration;
6+
import org.apache.fluss.metadata.Schema;
7+
import org.apache.fluss.metadata.TableDescriptor;
8+
import org.apache.fluss.metadata.TablePath;
9+
import org.apache.fluss.types.DataTypes;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
public class Demo {
15+
16+
public static void main(String[] args) throws Exception {
17+
// creating Connection object to connect with Fluss cluster
18+
Configuration conf = new Configuration();
19+
conf.setString("bootstrap.servers", "localhost:9123");
20+
Connection connection = ConnectionFactory.createConnection(conf);
21+
22+
// obtain Admin instance from the Connection
23+
Admin admin = connection.getAdmin();
24+
admin.listDatabases().get().forEach(System.out::println);
25+
26+
Map<String, String> customProperties = new HashMap<>();
27+
customProperties.put("k1", "v1");
28+
customProperties.put("paimon.file.format", "parquet");
29+
30+
// test bucket key log table
31+
TableDescriptor logTable =
32+
TableDescriptor.builder()
33+
.schema(
34+
Schema.newBuilder()
35+
.column("log_c1", DataTypes.INT())
36+
.column("log_c2", DataTypes.STRING())
37+
.build())
38+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
39+
.customProperties(customProperties)
40+
.distributedBy(3, "log_c1", "log_c2")
41+
.build();
42+
TablePath logTablePath = TablePath.of("fluss", "log_table");
43+
admin.createTable(logTablePath, logTable, false).get();
44+
}
45+
}

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@
3434
@PublicEvolving
3535
public interface LakeCatalog extends AutoCloseable {
3636

37+
/**
38+
* Get a table in lake.
39+
*
40+
* @param tablePath path of the table to be created
41+
* @throws TableNotExistException if the table not exists
42+
*/
43+
TableDescriptor getTable(TablePath tablePath) throws TableNotExistException;
44+
3745
/**
3846
* Create a new table in lake.
3947
*

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader
7474
this.loader = loader;
7575
}
7676

77+
@Override
78+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
79+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
80+
return inner.getTable(tablePath);
81+
}
82+
}
83+
7784
@Override
7885
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7986
throws TableAlreadyExistException {

fluss-common/src/main/java/org/apache/fluss/types/DataType.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public String asSummaryString() {
139139

140140
public abstract <R> R accept(DataTypeVisitor<R> visitor);
141141

142+
public DataType notNull() {
143+
return copy(false);
144+
}
145+
146+
public DataType nullable() {
147+
return copy(true);
148+
}
149+
142150
@Override
143151
public String toString() {
144152
return asSummaryString();

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public LakeSource<?> createLakeSource(TablePath tablePath) {
145145

146146
private static class TestPaimonLakeCatalog implements LakeCatalog {
147147

148+
@Override
149+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
150+
return null;
151+
}
152+
148153
@Override
149154
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
150155
throws TableAlreadyExistException {}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ protected Catalog getIcebergCatalog() {
8181
return icebergCatalog;
8282
}
8383

84+
@Override
85+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
86+
throw new UnsupportedOperationException(
87+
"Get table is not supported for Iceberg at the moment");
88+
}
89+
8490
@Override
8591
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
8692
throws TableAlreadyExistException {

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ public LanceLakeCatalog(Configuration config) {
4242
this.options = config;
4343
}
4444

45+
@Override
46+
public TableDescriptor getTable(TablePath tablePath) throws TableNotExistException {
47+
throw new UnsupportedOperationException(
48+
"Get table is not supported for Lance at the moment");
49+
}
50+
4551
@Override
4652
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
4753
// currently, we don't support primary key table for lance
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package org.apache.fluss.lake.paimon;
2+
3+
import org.apache.fluss.types.DataType;
4+
import org.apache.fluss.types.DataTypes;
5+
6+
import org.apache.paimon.types.ArrayType;
7+
import org.apache.paimon.types.BigIntType;
8+
import org.apache.paimon.types.BinaryType;
9+
import org.apache.paimon.types.BooleanType;
10+
import org.apache.paimon.types.CharType;
11+
import org.apache.paimon.types.DataField;
12+
import org.apache.paimon.types.DataTypeVisitor;
13+
import org.apache.paimon.types.DateType;
14+
import org.apache.paimon.types.DecimalType;
15+
import org.apache.paimon.types.DoubleType;
16+
import org.apache.paimon.types.FloatType;
17+
import org.apache.paimon.types.IntType;
18+
import org.apache.paimon.types.LocalZonedTimestampType;
19+
import org.apache.paimon.types.MapType;
20+
import org.apache.paimon.types.MultisetType;
21+
import org.apache.paimon.types.RowType;
22+
import org.apache.paimon.types.SmallIntType;
23+
import org.apache.paimon.types.TimeType;
24+
import org.apache.paimon.types.TimestampType;
25+
import org.apache.paimon.types.TinyIntType;
26+
import org.apache.paimon.types.VarBinaryType;
27+
import org.apache.paimon.types.VarCharType;
28+
import org.apache.paimon.types.VariantType;
29+
30+
/** Convert from Paimon's data type to Fluss's data type. */
31+
public class PaimonDataTypeToFlussDataType implements DataTypeVisitor<DataType> {
32+
33+
public static final PaimonDataTypeToFlussDataType INSTANCE =
34+
new PaimonDataTypeToFlussDataType();
35+
36+
@Override
37+
public DataType visit(CharType charType) {
38+
return withNullability(DataTypes.CHAR(charType.getLength()), charType.isNullable());
39+
}
40+
41+
@Override
42+
public DataType visit(VarCharType varCharType) {
43+
return withNullability(DataTypes.STRING(), varCharType.isNullable());
44+
}
45+
46+
@Override
47+
public DataType visit(BooleanType booleanType) {
48+
return withNullability(DataTypes.BOOLEAN(), booleanType.isNullable());
49+
}
50+
51+
@Override
52+
public DataType visit(BinaryType binaryType) {
53+
return withNullability(DataTypes.BINARY(binaryType.getLength()), binaryType.isNullable());
54+
}
55+
56+
@Override
57+
public DataType visit(VarBinaryType varBinaryType) {
58+
return withNullability(DataTypes.BYTES(), varBinaryType.isNullable());
59+
}
60+
61+
@Override
62+
public DataType visit(DecimalType decimalType) {
63+
return withNullability(
64+
DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()),
65+
decimalType.isNullable());
66+
}
67+
68+
@Override
69+
public DataType visit(TinyIntType tinyIntType) {
70+
return withNullability(DataTypes.TINYINT(), tinyIntType.isNullable());
71+
}
72+
73+
@Override
74+
public DataType visit(SmallIntType smallIntType) {
75+
return withNullability(DataTypes.SMALLINT(), smallIntType.isNullable());
76+
}
77+
78+
@Override
79+
public DataType visit(IntType intType) {
80+
return withNullability(DataTypes.INT(), intType.isNullable());
81+
}
82+
83+
@Override
84+
public DataType visit(BigIntType bigIntType) {
85+
return withNullability(DataTypes.BIGINT(), bigIntType.isNullable());
86+
}
87+
88+
@Override
89+
public DataType visit(FloatType floatType) {
90+
return withNullability(DataTypes.FLOAT(), floatType.isNullable());
91+
}
92+
93+
@Override
94+
public DataType visit(DoubleType doubleType) {
95+
return withNullability(DataTypes.DOUBLE(), doubleType.isNullable());
96+
}
97+
98+
@Override
99+
public DataType visit(DateType dateType) {
100+
return withNullability(DataTypes.DATE(), dateType.isNullable());
101+
}
102+
103+
@Override
104+
public DataType visit(TimeType timeType) {
105+
return withNullability(DataTypes.TIME(), timeType.isNullable());
106+
}
107+
108+
@Override
109+
public DataType visit(TimestampType timestampType) {
110+
return withNullability(
111+
DataTypes.TIMESTAMP(timestampType.getPrecision()), timestampType.isNullable());
112+
}
113+
114+
@Override
115+
public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
116+
return withNullability(
117+
DataTypes.TIMESTAMP_LTZ(localZonedTimestampType.getPrecision()),
118+
localZonedTimestampType.isNullable());
119+
}
120+
121+
@Override
122+
public DataType visit(VariantType variantType) {
123+
throw new UnsupportedOperationException("VariantType is not supported for Fluss.");
124+
}
125+
126+
@Override
127+
public DataType visit(ArrayType arrayType) {
128+
return withNullability(
129+
DataTypes.ARRAY(arrayType.getElementType().accept(this)), arrayType.isNullable());
130+
}
131+
132+
@Override
133+
public DataType visit(MultisetType multisetType) {
134+
throw new UnsupportedOperationException("MultisetType is not supported for Fluss.");
135+
}
136+
137+
@Override
138+
public DataType visit(MapType mapType) {
139+
return withNullability(
140+
DataTypes.MAP(
141+
mapType.getKeyType().accept(this), mapType.getValueType().accept(this)),
142+
mapType.isNullable());
143+
}
144+
145+
@Override
146+
public DataType visit(RowType rowType) {
147+
org.apache.fluss.types.RowType.Builder rowTypeBuilder =
148+
org.apache.fluss.types.RowType.builder();
149+
for (DataField field : rowType.getFields()) {
150+
rowTypeBuilder.field(field.name(), field.type().accept(this), field.description());
151+
}
152+
return withNullability(rowTypeBuilder.build(), rowType.isNullable());
153+
}
154+
155+
private DataType withNullability(DataType fluss, boolean nullable) {
156+
if (fluss.isNullable() != nullable) {
157+
return nullable ? fluss.nullable() : fluss.notNull();
158+
}
159+
return fluss;
160+
}
161+
}

0 commit comments

Comments
 (0)