Skip to content

Commit 90613f0

Browse files
committed
[iceberg] Fix spotless formatting violations in IcebergWriter and IcebergWriterTest
1 parent 09134c3 commit 90613f0

2 files changed

Lines changed: 22 additions & 23 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ public class IcebergWriter
7272

7373
private final List<WriteResultWrapper> temporaryWriteResult;
7474

75-
/** Per-table batch index; incremented on each schema-change flush, even when no writer exists. */
75+
/**
76+
* Per-table batch index; incremented on each schema-change flush, even when no writer exists.
77+
*/
7678
private Map<TableId, Integer> tableBatchIndexMap;
7779

7880
private Catalog catalog;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,8 +1118,7 @@ public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() throws Ex
11181118
writer1.write(createEvent, null);
11191119

11201120
BinaryRecordDataGenerator gen =
1121-
new BinaryRecordDataGenerator(
1122-
schema.getColumnDataTypes().toArray(new DataType[0]));
1121+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
11231122

11241123
// Only subtask 0 has data before the schema change.
11251124
writer0.write(
@@ -1166,7 +1165,8 @@ public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() throws Ex
11661165

11671166
/**
11681167
* Verifies no duplicates when parallel subtasks share a table and one subtask has no data
1169-
* before the schema-change flush while the other has an UPDATE that produces an equality-delete.
1168+
* before the schema-change flush while the other has an UPDATE that produces an
1169+
* equality-delete.
11701170
*/
11711171
@Test
11721172
public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() throws Exception {
@@ -1245,9 +1245,7 @@ public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() thro
12451245

12461246
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
12471247
committer.commit(
1248-
allResults.stream()
1249-
.map(MockCommitRequestImpl::new)
1250-
.collect(Collectors.toList()));
1248+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
12511249

12521250
// Only the updated value must survive; "old" must be deleted by the equality-delete in
12531251
// batch 1 (higher sequence number). Without the fix both rows appear.
@@ -1295,8 +1293,7 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
12951293
writer1.write(createEvent, null);
12961294

12971295
BinaryRecordDataGenerator gen =
1298-
new BinaryRecordDataGenerator(
1299-
schema.getColumnDataTypes().toArray(new DataType[0]));
1296+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
13001297

13011298
// Both subtasks write data with no schema change, so both produce batchIndex=0.
13021299
writer0.write(
@@ -1314,7 +1311,11 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
13141311

13151312
// Both wrappers carry batchIndex=0.
13161313
Assertions.assertThat(allResults).hasSize(2);
1317-
Assertions.assertThat(allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count())
1314+
Assertions.assertThat(
1315+
allResults.stream()
1316+
.mapToInt(WriteResultWrapper::getBatchIndex)
1317+
.distinct()
1318+
.count())
13181319
.isEqualTo(1);
13191320

13201321
Table table =
@@ -1324,9 +1325,7 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
13241325

13251326
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
13261327
committer.commit(
1327-
allResults.stream()
1328-
.map(MockCommitRequestImpl::new)
1329-
.collect(Collectors.toList()));
1328+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
13301329

13311330
table.refresh();
13321331
long snapshotsAfter = countSnapshots(table);
@@ -1387,7 +1386,8 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
13871386
// Batch 0: only subtask 0 has data before SC1.
13881387
writer0.write(
13891388
DataChangeEvent.insertEvent(
1390-
tableId, gen0.generate(new Object[] {1L, BinaryStringData.fromString("a")})),
1389+
tableId,
1390+
gen0.generate(new Object[] {1L, BinaryStringData.fromString("a")})),
13911391
null);
13921392
// Subtask 1 has no data before SC1.
13931393

@@ -1438,14 +1438,16 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
14381438
writer0.write(
14391439
DataChangeEvent.updateEvent(
14401440
tableId,
1441-
gen2.generate(new Object[] {1L, BinaryStringData.fromString("a"), null, null}),
1441+
gen2.generate(
1442+
new Object[] {1L, BinaryStringData.fromString("a"), null, null}),
14421443
gen2.generate(
14431444
new Object[] {1L, BinaryStringData.fromString("c"), null, null})),
14441445
null);
14451446
writer1.write(
14461447
DataChangeEvent.updateEvent(
14471448
tableId,
1448-
gen2.generate(new Object[] {2L, BinaryStringData.fromString("b"), null, null}),
1449+
gen2.generate(
1450+
new Object[] {2L, BinaryStringData.fromString("b"), null, null}),
14491451
gen2.generate(
14501452
new Object[] {2L, BinaryStringData.fromString("d"), null, null})),
14511453
null);
@@ -1456,17 +1458,12 @@ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges()
14561458

14571459
// Expect 3 batches: {0: sub0}, {1: sub1}, {2: sub0+sub1}
14581460
long distinctBatchIndices =
1459-
allResults.stream()
1460-
.mapToInt(WriteResultWrapper::getBatchIndex)
1461-
.distinct()
1462-
.count();
1461+
allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count();
14631462
Assertions.assertThat(distinctBatchIndices).isEqualTo(3);
14641463

14651464
IcebergCommitter committer = new IcebergCommitter(catalogOptions);
14661465
committer.commit(
1467-
allResults.stream()
1468-
.map(MockCommitRequestImpl::new)
1469-
.collect(Collectors.toList()));
1466+
allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()));
14701467

14711468
// Only the final values must survive. Equality-deletes in batch 2 (seq N+2) must suppress
14721469
// the stale inserts in batch 0 (seq N) and batch 1 (seq N+1).

0 commit comments

Comments
 (0)