Skip to content

Commit 2f0ed03

Browse files
committed
[iceberg] Update test constructor calls for upstream hadoopConfOptions parameter
After rebasing onto upstream master (which merged FLINK-39342), IcebergWriter and IcebergCommitter constructors now require an additional hadoopConfOptions parameter. Updated all new test methods to pass new HashMap<>() for this param.
1 parent 90613f0 commit 2f0ed03

1 file changed

Lines changed: 112 additions & 21 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java

Lines changed: 112 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,14 @@ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws
617617
String operatorId = UUID.randomUUID().toString();
618618
IcebergWriter icebergWriter =
619619
new IcebergWriter(
620-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
620+
catalogOptions,
621+
1,
622+
1,
623+
ZoneId.systemDefault(),
624+
0,
625+
jobId,
626+
operatorId,
627+
new HashMap<>());
621628
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
622629

623630
TableId tableId = TableId.parse("test.iceberg_table");
@@ -648,7 +655,7 @@ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws
648655
icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordA, recordB), null);
649656

650657
Collection<WriteResultWrapper> writeResults = icebergWriter.prepareCommit();
651-
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
658+
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
652659
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
653660
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
654661
icebergCommitter.commit(collection);
@@ -685,7 +692,14 @@ public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() throws Exc
685692
String operatorId = UUID.randomUUID().toString();
686693
IcebergWriter icebergWriter =
687694
new IcebergWriter(
688-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
695+
catalogOptions,
696+
1,
697+
1,
698+
ZoneId.systemDefault(),
699+
0,
700+
jobId,
701+
operatorId,
702+
new HashMap<>());
689703
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
690704

691705
TableId tableId = TableId.parse("test.iceberg_table");
@@ -731,7 +745,7 @@ public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() throws Exc
731745
icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew, recordB), null);
732746

733747
Collection<WriteResultWrapper> writeResults = icebergWriter.prepareCommit();
734-
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
748+
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
735749
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
736750
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
737751
icebergCommitter.commit(collection);
@@ -767,7 +781,14 @@ public void testRetryAfterPartialBatchCommit() throws Exception {
767781
String operatorId = UUID.randomUUID().toString();
768782
IcebergWriter icebergWriter =
769783
new IcebergWriter(
770-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
784+
catalogOptions,
785+
1,
786+
1,
787+
ZoneId.systemDefault(),
788+
0,
789+
jobId,
790+
operatorId,
791+
new HashMap<>());
771792
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
772793

773794
TableId tableId = TableId.parse("test.iceberg_table");
@@ -844,7 +865,7 @@ public void testRetryAfterPartialBatchCommit() throws Exception {
844865
partialDelta.commit();
845866

846867
// Retry: Flink re-delivers all committables for the checkpoint.
847-
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
868+
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
848869
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
849870
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
850871
icebergCommitter.commit(collection);
@@ -888,7 +909,14 @@ public void testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint() throws Exc
888909
String operatorId = UUID.randomUUID().toString();
889910
IcebergWriter icebergWriter =
890911
new IcebergWriter(
891-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
912+
catalogOptions,
913+
1,
914+
1,
915+
ZoneId.systemDefault(),
916+
0,
917+
jobId,
918+
operatorId,
919+
new HashMap<>());
892920
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
893921

894922
TableId tableId = TableId.parse("test.iceberg_table");
@@ -962,7 +990,7 @@ public void testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint() throws Exc
962990
Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1);
963991
Assertions.assertThat(sortedBatches.get(2).getBatchIndex()).isEqualTo(2);
964992

965-
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
993+
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
966994
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
967995
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
968996
icebergCommitter.commit(collection);
@@ -993,7 +1021,14 @@ public void testSchemaChangeFlushDoesNotAffectOtherTable() throws Exception {
9931021
String operatorId = UUID.randomUUID().toString();
9941022
IcebergWriter icebergWriter =
9951023
new IcebergWriter(
996-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1024+
catalogOptions,
1025+
1,
1026+
1,
1027+
ZoneId.systemDefault(),
1028+
0,
1029+
jobId,
1030+
operatorId,
1031+
new HashMap<>());
9971032
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
9981033

9991034
TableId tableA = TableId.parse("test.table_a");
@@ -1066,7 +1101,7 @@ public void testSchemaChangeFlushDoesNotAffectOtherTable() throws Exception {
10661101
Assertions.assertThat(batchCountByTable.get(tableA)).isEqualTo(2);
10671102
Assertions.assertThat(batchCountByTable.get(tableB)).isEqualTo(1);
10681103

1069-
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
1104+
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
10701105
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
10711106
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
10721107
icebergCommitter.commit(collection);
@@ -1100,10 +1135,24 @@ public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() throws Ex
11001135
// Two subtask writers sharing the same catalog and table.
11011136
IcebergWriter writer0 =
11021137
new IcebergWriter(
1103-
catalogOptions, 0, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1138+
catalogOptions,
1139+
0,
1140+
1,
1141+
ZoneId.systemDefault(),
1142+
0,
1143+
jobId,
1144+
operatorId,
1145+
new HashMap<>());
11041146
IcebergWriter writer1 =
11051147
new IcebergWriter(
1106-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1148+
catalogOptions,
1149+
1,
1150+
1,
1151+
ZoneId.systemDefault(),
1152+
0,
1153+
jobId,
1154+
operatorId,
1155+
new HashMap<>());
11071156

11081157
TableId tableId = TableId.parse("test.iceberg_table");
11091158
Schema schema =
@@ -1186,10 +1235,24 @@ public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() thro
11861235

11871236
IcebergWriter writer0 =
11881237
new IcebergWriter(
1189-
catalogOptions, 0, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1238+
catalogOptions,
1239+
0,
1240+
1,
1241+
ZoneId.systemDefault(),
1242+
0,
1243+
jobId,
1244+
operatorId,
1245+
new HashMap<>());
11901246
IcebergWriter writer1 =
11911247
new IcebergWriter(
1192-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1248+
catalogOptions,
1249+
1,
1250+
1,
1251+
ZoneId.systemDefault(),
1252+
0,
1253+
jobId,
1254+
operatorId,
1255+
new HashMap<>());
11931256

11941257
TableId tableId = TableId.parse("test.iceberg_table");
11951258
Schema initialSchema =
@@ -1243,7 +1306,7 @@ public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() thro
12431306
allResults.addAll(writer0.prepareCommit());
12441307
allResults.addAll(writer1.prepareCommit());
12451308

1246-
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
1309+
IcebergCommitter committer = new IcebergCommitter(catalogOptions, new HashMap<>());
12471310
committer.commit(
12481311
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
12491312

@@ -1275,10 +1338,24 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
12751338

12761339
IcebergWriter writer0 =
12771340
new IcebergWriter(
1278-
catalogOptions, 0, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1341+
catalogOptions,
1342+
0,
1343+
1,
1344+
ZoneId.systemDefault(),
1345+
0,
1346+
jobId,
1347+
operatorId,
1348+
new HashMap<>());
12791349
IcebergWriter writer1 =
12801350
new IcebergWriter(
1281-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1351+
catalogOptions,
1352+
1,
1353+
1,
1354+
ZoneId.systemDefault(),
1355+
0,
1356+
jobId,
1357+
operatorId,
1358+
new HashMap<>());
12821359

12831360
TableId tableId = TableId.parse("test.iceberg_table");
12841361
Schema schema =
@@ -1323,7 +1400,7 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
13231400
TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName()));
13241401
long snapshotsBefore = countSnapshots(table);
13251402

1326-
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
1403+
IcebergCommitter committer = new IcebergCommitter(catalogOptions, new HashMap<>());
13271404
committer.commit(
13281405
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
13291406

@@ -1362,10 +1439,24 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
13621439

13631440
IcebergWriter writer0 =
13641441
new IcebergWriter(
1365-
catalogOptions, 0, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1442+
catalogOptions,
1443+
0,
1444+
1,
1445+
ZoneId.systemDefault(),
1446+
0,
1447+
jobId,
1448+
operatorId,
1449+
new HashMap<>());
13661450
IcebergWriter writer1 =
13671451
new IcebergWriter(
1368-
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
1452+
catalogOptions,
1453+
1,
1454+
1,
1455+
ZoneId.systemDefault(),
1456+
0,
1457+
jobId,
1458+
operatorId,
1459+
new HashMap<>());
13691460

13701461
TableId tableId = TableId.parse("test.iceberg_table");
13711462
Schema schema0 =
@@ -1461,7 +1552,7 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
14611552
allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count();
14621553
Assertions.assertThat(distinctBatchIndices).isEqualTo(3);
14631554

1464-
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
1555+
IcebergCommitter committer = new IcebergCommitter(catalogOptions, new HashMap<>());
14651556
committer.commit(
14661557
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
14671558

0 commit comments

Comments
 (0)