Skip to content

Commit 61475a8

Browse files
authored
[lake/iceberg] thrown Exception when partition columns are of non-String type (#1831)
1 parent 9481c19 commit 61475a8

File tree

2 files changed

+60
-10
lines changed

2 files changed

+60
-10
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.exception.InvalidTableException;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
2324
import org.apache.fluss.exception.TableNotExistException;
2425
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
@@ -51,6 +52,7 @@
5152
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5253
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
5354
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
55+
import static org.apache.iceberg.types.Type.TypeID.STRING;
5456

5557
/** An Iceberg implementation of {@link LakeCatalog}. */
5658
public class IcebergLakeCatalog implements LakeCatalog {
@@ -214,6 +216,13 @@ private PartitionSpec createPartitionSpec(
214216
List<String> partitionKeys = tableDescriptor.getPartitionKeys();
215217
// always set identity partition with partition key
216218
for (String partitionKey : partitionKeys) {
219+
if (!icebergSchema.findType(partitionKey).typeId().equals(STRING)) {
220+
// TODO: Support other types of partition keys
221+
throw new InvalidTableException(
222+
String.format(
223+
"Partition key only support string type for iceberg currently. Column `%s` is not string type.",
224+
partitionKey));
225+
}
217226
builder.identity(partitionKey);
218227
}
219228

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.fluss.lake.iceberg;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.exception.InvalidTableException;
2123
import org.apache.fluss.metadata.Schema;
2224
import org.apache.fluss.metadata.TableDescriptor;
2325
import org.apache.fluss.metadata.TablePath;
@@ -29,14 +31,19 @@
2931
import org.apache.iceberg.Table;
3032
import org.apache.iceberg.catalog.TableIdentifier;
3133
import org.apache.iceberg.types.Types;
34+
import org.assertj.core.api.Assertions;
3235
import org.junit.jupiter.api.BeforeEach;
3336
import org.junit.jupiter.api.Test;
3437
import org.junit.jupiter.api.io.TempDir;
38+
import org.junit.jupiter.params.ParameterizedTest;
39+
import org.junit.jupiter.params.provider.ValueSource;
3540

3641
import java.io.File;
42+
import java.time.Duration;
3743
import java.util.Arrays;
3844
import java.util.Collections;
3945
import java.util.HashSet;
46+
import java.util.List;
4047
import java.util.Set;
4148

4249
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
@@ -149,18 +156,19 @@ void testCreatePartitionedPrimaryKeyTable() {
149156

150157
Schema flussSchema =
151158
Schema.newBuilder()
152-
.column("shop_id", DataTypes.BIGINT())
159+
.column("dt", DataTypes.STRING())
153160
.column("user_id", DataTypes.BIGINT())
161+
.column("shop_id", DataTypes.BIGINT())
154162
.column("num_orders", DataTypes.INT())
155163
.column("total_amount", DataTypes.INT().copy(false))
156-
.primaryKey("shop_id", "user_id")
164+
.primaryKey("dt", "user_id")
157165
.build();
158166

159167
TableDescriptor tableDescriptor =
160168
TableDescriptor.builder()
161169
.schema(flussSchema)
162170
.distributedBy(10)
163-
.partitionedBy("shop_id")
171+
.partitionedBy("dt")
164172
.property("iceberg.write.format.default", "orc")
165173
.property("fluss_k1", "fluss_v1")
166174
.build();
@@ -178,26 +186,27 @@ void testCreatePartitionedPrimaryKeyTable() {
178186
org.apache.iceberg.Schema expectIcebergSchema =
179187
new org.apache.iceberg.Schema(
180188
Arrays.asList(
181-
Types.NestedField.required(1, "shop_id", Types.LongType.get()),
189+
Types.NestedField.required(1, "dt", Types.StringType.get()),
182190
Types.NestedField.required(2, "user_id", Types.LongType.get()),
191+
Types.NestedField.optional(3, "shop_id", Types.LongType.get()),
183192
Types.NestedField.optional(
184-
3, "num_orders", Types.IntegerType.get()),
193+
4, "num_orders", Types.IntegerType.get()),
185194
Types.NestedField.required(
186-
4, "total_amount", Types.IntegerType.get()),
195+
5, "total_amount", Types.IntegerType.get()),
187196
Types.NestedField.required(
188-
5, BUCKET_COLUMN_NAME, Types.IntegerType.get()),
197+
6, BUCKET_COLUMN_NAME, Types.IntegerType.get()),
189198
Types.NestedField.required(
190-
6, OFFSET_COLUMN_NAME, Types.LongType.get()),
199+
7, OFFSET_COLUMN_NAME, Types.LongType.get()),
191200
Types.NestedField.required(
192-
7, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())),
201+
8, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())),
193202
identifierFieldIds);
194203
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
195204

196205
// Verify partition spec
197206
assertThat(createdTable.spec().fields()).hasSize(2);
198207
// first should be partitioned by the fluss partition key
199208
PartitionField partitionField1 = createdTable.spec().fields().get(0);
200-
assertThat(partitionField1.name()).isEqualTo("shop_id");
209+
assertThat(partitionField1.name()).isEqualTo("dt");
201210
assertThat(partitionField1.transform().toString()).isEqualTo("identity");
202211
assertThat(partitionField1.sourceId()).isEqualTo(1);
203212

@@ -396,4 +405,36 @@ void rejectsLogTableWithMultipleBucketKeys() {
396405
.isInstanceOf(UnsupportedOperationException.class)
397406
.hasMessageContaining("Only one bucket key is supported for Iceberg");
398407
}
408+
409+
@ParameterizedTest
410+
@ValueSource(booleans = {false, true})
411+
void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws Exception {
412+
TablePath t1 =
413+
TablePath.of(
414+
"test_db",
415+
isPrimaryKeyTable
416+
? "pkIllegalPartitionKeyType"
417+
: "logIllegalPartitionKeyType");
418+
Schema.Builder builder =
419+
Schema.newBuilder()
420+
.column("c0", DataTypes.STRING())
421+
.column("c1", DataTypes.BOOLEAN());
422+
if (isPrimaryKeyTable) {
423+
builder.primaryKey("c0", "c1");
424+
}
425+
List<String> partitionKeys = List.of("c1");
426+
TableDescriptor.Builder tableDescriptor =
427+
TableDescriptor.builder()
428+
.schema(builder.build())
429+
.distributedBy(1, "c0")
430+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
431+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
432+
tableDescriptor.partitionedBy(partitionKeys);
433+
434+
Assertions.assertThatThrownBy(
435+
() -> flussIcebergCatalog.createTable(t1, tableDescriptor.build()))
436+
.isInstanceOf(InvalidTableException.class)
437+
.hasMessage(
438+
"Partition key only support string type for iceberg currently. Column `c1` is not string type.");
439+
}
399440
}

0 commit comments

Comments
 (0)