Skip to content

Commit c879eed

Browse files
authored
Spark 3.5: Fix RewriteDataFiles with partial progress enabled and max-failed-commits larger than total-file-group (#12120)
1 parent e1f2cfd commit c879eed

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

Diff for: spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,8 @@ private Builder doExecuteWithPartialProgress(
377377
// stop commit service
378378
commitService.close();
379379

380-
int failedCommits = maxCommits - commitService.succeededCommits();
380+
int totalCommits = Math.min(ctx.totalGroupCount(), maxCommits);
381+
int failedCommits = totalCommits - commitService.succeededCommits();
381382
if (failedCommits > 0 && failedCommits <= maxFailedCommits) {
382383
LOG.warn(
383384
"{} is true but {} rewrite commits failed. Check the logs to determine why the individual "

Diff for: spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java

+28
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,34 @@ public void testParallelPartialProgressWithMaxFailedCommits() {
13201320
shouldHaveACleanCache(table);
13211321
}
13221322

1323+
@TestTemplate
1324+
public void testParallelPartialProgressWithMaxFailedCommitsLargerThanTotalFileGroup() {
1325+
Table table = createTable(20);
1326+
int fileSize = averageFileSize(table);
1327+
1328+
List<Object[]> originalData = currentData();
1329+
1330+
RewriteDataFilesSparkAction rewrite =
1331+
basicRewrite(table)
1332+
.option(
1333+
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 1000))
1334+
.option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "3")
1335+
.option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
1336+
// Since we can have at most one commit per file group and there are only 10 file
1337+
// groups, actual number of commits is 10
1338+
.option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "20")
1339+
.option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "0");
1340+
rewrite.execute();
1341+
1342+
table.refresh();
1343+
1344+
List<Object[]> postRewriteData = currentData();
1345+
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
1346+
shouldHaveSnapshots(table, 11);
1347+
shouldHaveNoOrphans(table);
1348+
shouldHaveACleanCache(table);
1349+
}
1350+
13231351
@TestTemplate
13241352
public void testInvalidOptions() {
13251353
Table table = createTable(20);

0 commit comments

Comments
 (0)