|
19 | 19 |
|
20 | 20 | import org.apache.fluss.config.AutoPartitionTimeUnit; |
21 | 21 | import org.apache.fluss.config.ConfigOptions; |
22 | | -import org.apache.fluss.exception.InvalidTableException; |
23 | 22 | import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase; |
24 | 23 | import org.apache.fluss.metadata.Schema; |
25 | 24 | import org.apache.fluss.metadata.TableBucket; |
|
40 | 39 | import org.apache.iceberg.data.Record; |
41 | 40 | import org.junit.jupiter.api.BeforeAll; |
42 | 41 | import org.junit.jupiter.api.Test; |
43 | | -import org.junit.jupiter.params.ParameterizedTest; |
44 | | -import org.junit.jupiter.params.provider.Arguments; |
45 | | -import org.junit.jupiter.params.provider.ValueSource; |
46 | 42 |
|
47 | 43 | import java.nio.ByteBuffer; |
48 | 44 | import java.time.Duration; |
|
57 | 53 | import java.util.Iterator; |
58 | 54 | import java.util.List; |
59 | 55 | import java.util.Map; |
60 | | -import java.util.concurrent.ExecutionException; |
61 | | -import java.util.stream.Stream; |
62 | 56 |
|
63 | 57 | import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; |
64 | 58 | import static org.apache.fluss.testutils.DataTestUtils.row; |
65 | 59 | import static org.assertj.core.api.Assertions.assertThat; |
66 | | -import static org.assertj.core.api.Assertions.assertThatThrownBy; |
67 | 60 |
|
68 | 61 | /** The ITCase for tiering into iceberg. */ |
69 | 62 | class IcebergTieringITCase extends FlinkIcebergTieringTestBase { |
@@ -93,7 +86,7 @@ class IcebergTieringITCase extends FlinkIcebergTieringTestBase { |
93 | 86 | .column("f_time", DataTypes.TIME()) |
94 | 87 | .column("f_char", DataTypes.CHAR(3)) |
95 | 88 | .column("f_bytes", DataTypes.BYTES()) |
96 | | - .primaryKey("f_date", "f_int") |
| 89 | + .primaryKey("f_int") |
97 | 90 | .build(); |
98 | 91 |
|
99 | 92 | private static final Schema logSchema = |
@@ -285,44 +278,6 @@ void testTiering() throws Exception { |
285 | 278 | } |
286 | 279 | } |
287 | 280 |
|
288 | | - private static Stream<Arguments> tieringAllTypesWriteArgs() { |
289 | | - return Stream.of(Arguments.of(true), Arguments.of(false)); |
290 | | - } |
291 | | - |
292 | | - @ParameterizedTest |
293 | | - @ValueSource(booleans = {false, true}) |
294 | | - void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception { |
295 | | - // create a table, write some records and wait until snapshot finished |
296 | | - TablePath t1 = |
297 | | - TablePath.of( |
298 | | - DEFAULT_DB, |
299 | | - isPrimaryKeyTable ? "pkTableForAllTypes" : "logTableForAllTypes"); |
300 | | - Schema.Builder builder = |
301 | | - Schema.newBuilder() |
302 | | - .column("c0", DataTypes.STRING()) |
303 | | - .column("c1", DataTypes.BOOLEAN()); |
304 | | - if (isPrimaryKeyTable) { |
305 | | - builder.primaryKey("c0", "c1"); |
306 | | - } |
307 | | - List<String> partitionKeys = List.of("c1"); |
308 | | - TableDescriptor.Builder tableDescriptor = |
309 | | - TableDescriptor.builder() |
310 | | - .schema(builder.build()) |
311 | | - .distributedBy(1, "c0") |
312 | | - .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") |
313 | | - .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); |
314 | | - tableDescriptor.partitionedBy(partitionKeys); |
315 | | - tableDescriptor.customProperties(Collections.emptyMap()); |
316 | | - tableDescriptor.properties(Collections.emptyMap()); |
317 | | - |
318 | | - assertThatThrownBy(() -> createTable(t1, tableDescriptor.build())) |
319 | | - .isInstanceOf(ExecutionException.class) |
320 | | - .rootCause() |
321 | | - .isInstanceOf(InvalidTableException.class) |
322 | | - .hasMessage( |
323 | | - "Iceberg partition key only support string type, c1 is not string type."); |
324 | | - } |
325 | | - |
326 | 281 | private void checkDataInIcebergPrimaryKeyTable( |
327 | 282 | TablePath tablePath, List<InternalRow> expectedRows) throws Exception { |
328 | 283 | Iterator<Record> acturalIterator = getIcebergRecords(tablePath).iterator(); |
|
0 commit comments