Skip to content

Commit b1c46f7

Browse files
committed
[iceberg] Fix spotless formatting violations in IcebergWriter and IcebergWriterTest
1 parent a618352 commit b1c46f7

2 files changed

Lines changed: 22 additions & 23 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ public class IcebergWriter
7171

7272
private final List<WriteResultWrapper> temporaryWriteResult;
7373

74-
/** Per-table batch index; incremented on each schema-change flush, even when no writer exists. */
74+
/**
75+
* Per-table batch index; incremented on each schema-change flush, even when no writer exists.
76+
*/
7577
private Map<TableId, Integer> tableBatchIndexMap;
7678

7779
private Catalog catalog;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,7 @@ public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() throws Ex
10741074
writer1.write(createEvent, null);
10751075

10761076
BinaryRecordDataGenerator gen =
1077-
new BinaryRecordDataGenerator(
1078-
schema.getColumnDataTypes().toArray(new DataType[0]));
1077+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
10791078

10801079
// Only subtask 0 has data before the schema change.
10811080
writer0.write(
@@ -1122,7 +1121,8 @@ public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() throws Ex
11221121

11231122
/**
11241123
* Verifies no duplicates when parallel subtasks share a table and one subtask has no data
1125-
* before the schema-change flush while the other has an UPDATE that produces an equality-delete.
1124+
* before the schema-change flush while the other has an UPDATE that produces an
1125+
* equality-delete.
11261126
*/
11271127
@Test
11281128
public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() throws Exception {
@@ -1201,9 +1201,7 @@ public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() thro
12011201

12021202
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
12031203
committer.commit(
1204-
allResults.stream()
1205-
.map(MockCommitRequestImpl::new)
1206-
.collect(Collectors.toList()));
1204+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
12071205

12081206
// Only the updated value must survive; "old" must be deleted by the equality-delete in
12091207
// batch 1 (higher sequence number). Without the fix both rows appear.
@@ -1251,8 +1249,7 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
12511249
writer1.write(createEvent, null);
12521250

12531251
BinaryRecordDataGenerator gen =
1254-
new BinaryRecordDataGenerator(
1255-
schema.getColumnDataTypes().toArray(new DataType[0]));
1252+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
12561253

12571254
// Both subtasks write data with no schema change, so both produce batchIndex=0.
12581255
writer0.write(
@@ -1270,7 +1267,11 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
12701267

12711268
// Both wrappers carry batchIndex=0.
12721269
Assertions.assertThat(allResults).hasSize(2);
1273-
Assertions.assertThat(allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count())
1270+
Assertions.assertThat(
1271+
allResults.stream()
1272+
.mapToInt(WriteResultWrapper::getBatchIndex)
1273+
.distinct()
1274+
.count())
12741275
.isEqualTo(1);
12751276

12761277
Table table =
@@ -1280,9 +1281,7 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
12801281

12811282
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
12821283
committer.commit(
1283-
allResults.stream()
1284-
.map(MockCommitRequestImpl::new)
1285-
.collect(Collectors.toList()));
1284+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
12861285

12871286
table.refresh();
12881287
long snapshotsAfter = countSnapshots(table);
@@ -1343,7 +1342,8 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
13431342
// Batch 0: only subtask 0 has data before SC1.
13441343
writer0.write(
13451344
DataChangeEvent.insertEvent(
1346-
tableId, gen0.generate(new Object[] {1L, BinaryStringData.fromString("a")})),
1345+
tableId,
1346+
gen0.generate(new Object[] {1L, BinaryStringData.fromString("a")})),
13471347
null);
13481348
// Subtask 1 has no data before SC1.
13491349

@@ -1394,14 +1394,16 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
13941394
writer0.write(
13951395
DataChangeEvent.updateEvent(
13961396
tableId,
1397-
gen2.generate(new Object[] {1L, BinaryStringData.fromString("a"), null, null}),
1397+
gen2.generate(
1398+
new Object[] {1L, BinaryStringData.fromString("a"), null, null}),
13981399
gen2.generate(
13991400
new Object[] {1L, BinaryStringData.fromString("c"), null, null})),
14001401
null);
14011402
writer1.write(
14021403
DataChangeEvent.updateEvent(
14031404
tableId,
1404-
gen2.generate(new Object[] {2L, BinaryStringData.fromString("b"), null, null}),
1405+
gen2.generate(
1406+
new Object[] {2L, BinaryStringData.fromString("b"), null, null}),
14051407
gen2.generate(
14061408
new Object[] {2L, BinaryStringData.fromString("d"), null, null})),
14071409
null);
@@ -1412,17 +1414,12 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
14121414

14131415
// Expect 3 batches: {0: sub0}, {1: sub1}, {2: sub0+sub1}
14141416
long distinctBatchIndices =
1415-
allResults.stream()
1416-
.mapToInt(WriteResultWrapper::getBatchIndex)
1417-
.distinct()
1418-
.count();
1417+
allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count();
14191418
Assertions.assertThat(distinctBatchIndices).isEqualTo(3);
14201419

14211420
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
14221421
committer.commit(
1423-
allResults.stream()
1424-
.map(MockCommitRequestImpl::new)
1425-
.collect(Collectors.toList()));
1422+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
14261423

14271424
// Only the final values must survive. Equality-deletes in batch 2 (seq N+2) must suppress
14281425
// the stale inserts in batch 0 (seq N) and batch 1 (seq N+1).

0 commit comments

Comments
 (0)