Skip to content

Commit 23b679e

Browse files
committed
Add does not contain default value schema
1 parent 903bc85 commit 23b679e

1 file changed

Lines changed: 64 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,70 @@ public void runSchemaChangeTest(Map<String, String> catalogOptions) {
216216
new HashSet<>(Collections.singletonList(1)));
217217
assertThat(table.schema().sameSchema(schema)).isTrue();
218218

219+
// Verify that schema without default values is NOT the same.
220+
org.apache.iceberg.Schema schemaWithoutDefaults =
221+
new org.apache.iceberg.Schema(
222+
0,
223+
Arrays.asList(
224+
Types.NestedField.builder()
225+
.withId(1)
226+
.asRequired()
227+
.withName("id")
228+
.ofType(Types.LongType.get())
229+
.withDoc("column for id")
230+
.build(),
231+
Types.NestedField.builder()
232+
.withId(2)
233+
.asRequired()
234+
.withName("name")
235+
.ofType(Types.StringType.get())
236+
.withDoc("column for name")
237+
.build(),
238+
Types.NestedField.builder()
239+
.withId(3)
240+
.asOptional()
241+
.withName("tinyIntCol")
242+
.ofType(Types.IntegerType.get())
243+
.withDoc("column for tinyIntCol")
244+
.build(),
245+
Types.NestedField.builder()
246+
.withId(4)
247+
.asOptional()
248+
.withName("description")
249+
.ofType(Types.StringType.get())
250+
.withDoc("column for descriptions")
251+
.build(),
252+
Types.NestedField.builder()
253+
.withId(5)
254+
.asOptional()
255+
.withName("bool_column")
256+
.ofType(Types.BooleanType.get())
257+
.withDoc("column for bool")
258+
.build(),
259+
Types.NestedField.builder()
260+
.withId(6)
261+
.asOptional()
262+
.withName("float_column")
263+
.ofType(Types.FloatType.get())
264+
.withDoc("column for float")
265+
.build(),
266+
Types.NestedField.builder()
267+
.withId(7)
268+
.asOptional()
269+
.withName("double_column")
270+
.ofType(Types.DoubleType.get())
271+
.withDoc("column for double")
272+
.build(),
273+
Types.NestedField.builder()
274+
.withId(8)
275+
.asOptional()
276+
.withName("decimal_column")
277+
.ofType(Types.DecimalType.of(10, 2))
278+
.withDoc("column for decimal")
279+
.build()),
280+
new HashSet<>(Collections.singletonList(1)));
281+
assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse();
282+
219283
// Add column with default value.
220284
AddColumnEvent addColumnEvent =
221285
new AddColumnEvent(

0 commit comments

Comments
 (0)