Skip to content

Commit 4fe4734

Browse files
author
wangjunbo
committed
[lake/iceberg] thrown Exception when partition columns are of non-String type
1 parent 025ffd1 commit 4fe4734

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.exception.InvalidTableException;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
2324
import org.apache.fluss.exception.TableNotExistException;
2425
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
@@ -51,6 +52,7 @@
5152
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5253
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
5354
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
55+
import static org.apache.iceberg.types.Type.TypeID.STRING;
5456

5557
/** An Iceberg implementation of {@link LakeCatalog}. */
5658
public class IcebergLakeCatalog implements LakeCatalog {
@@ -214,6 +216,14 @@ private PartitionSpec createPartitionSpec(
214216
List<String> partitionKeys = tableDescriptor.getPartitionKeys();
215217
// always set identity partition with partition key
216218
for (String partitionKey : partitionKeys) {
219+
if (!icebergSchema.findType(partitionKey).typeId().equals(STRING)) {
220+
// TODO: Support other types of partition column
221+
throw new InvalidTableException(
222+
String.format(
223+
"Iceberg partition column only support string type, "
224+
+ "%s is not string type.",
225+
partitionKey));
226+
}
217227
builder.identity(partitionKey);
218228
}
219229

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.config.AutoPartitionTimeUnit;
2121
import org.apache.fluss.config.ConfigOptions;
22+
import org.apache.fluss.exception.InvalidTableException;
2223
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
2324
import org.apache.fluss.metadata.Schema;
2425
import org.apache.fluss.metadata.TableBucket;
@@ -39,6 +40,10 @@
3940
import org.apache.iceberg.data.Record;
4041
import org.junit.jupiter.api.BeforeAll;
4142
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.params.ParameterizedTest;
44+
import org.junit.jupiter.params.provider.Arguments;
45+
import org.junit.jupiter.params.provider.MethodSource;
46+
import org.junit.jupiter.params.provider.ValueSource;
4247

4348
import java.nio.ByteBuffer;
4449
import java.time.Duration;
@@ -53,10 +58,13 @@
5358
import java.util.Iterator;
5459
import java.util.List;
5560
import java.util.Map;
61+
import java.util.concurrent.ExecutionException;
62+
import java.util.stream.Stream;
5663

5764
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
5865
import static org.apache.fluss.testutils.DataTestUtils.row;
5966
import static org.assertj.core.api.Assertions.assertThat;
67+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6068

6169
/** The ITCase for tiering into iceberg. */
6270
class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
@@ -86,7 +94,7 @@ class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
8694
.column("f_time", DataTypes.TIME())
8795
.column("f_char", DataTypes.CHAR(3))
8896
.column("f_bytes", DataTypes.BYTES())
89-
.primaryKey("f_int")
97+
.primaryKey("f_date", "f_int")
9098
.build();
9199

92100
private static final Schema logSchema =
@@ -278,6 +286,44 @@ void testTiering() throws Exception {
278286
}
279287
}
280288

289+
private static Stream<Arguments> tieringAllTypesWriteArgs() {
290+
return Stream.of(Arguments.of(true), Arguments.of(false));
291+
}
292+
293+
@ParameterizedTest
294+
@ValueSource(booleans = {false, true})
295+
void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
296+
// create a table, write some records and wait until snapshot finished
297+
TablePath t1 =
298+
TablePath.of(
299+
DEFAULT_DB,
300+
isPrimaryKeyTable ? "pkTableForAllTypes" : "logTableForAllTypes");
301+
Schema.Builder builder =
302+
Schema.newBuilder()
303+
.column("c0", DataTypes.STRING())
304+
.column("c1", DataTypes.BOOLEAN());
305+
if (isPrimaryKeyTable) {
306+
builder.primaryKey("c0", "c1");
307+
}
308+
List<String> partitionKeys = List.of("c1");
309+
TableDescriptor.Builder tableDescriptor =
310+
TableDescriptor.builder()
311+
.schema(builder.build())
312+
.distributedBy(1, "c0")
313+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
314+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
315+
tableDescriptor.partitionedBy(partitionKeys);
316+
tableDescriptor.customProperties(Collections.emptyMap());
317+
tableDescriptor.properties(Collections.emptyMap());
318+
319+
assertThatThrownBy(() -> createTable(t1, tableDescriptor.build()))
320+
.isInstanceOf(ExecutionException.class)
321+
.rootCause()
322+
.isInstanceOf(InvalidTableException.class)
323+
.hasMessage(
324+
"Iceberg partition column only support string type, c1 is not string type.");
325+
}
326+
281327
private void checkDataInIcebergPrimaryKeyTable(
282328
TablePath tablePath, List<InternalRow> expectedRows) throws Exception {
283329
Iterator<Record> acturalIterator = getIcebergRecords(tablePath).iterator();

0 commit comments

Comments
 (0)