Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Support incremental compute for partition stats #12629

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void tearDownBenchmark() {
@Threads(1)
public void benchmarkPartitionStats() {
Collection<PartitionStats> partitionStats =
PartitionStatsUtil.computeStats(table, table.currentSnapshot());
PartitionStatsUtil.computeStats(table, null, table.currentSnapshot()).values();
assertThat(partitionStats).hasSize(PARTITION_PER_MANIFEST);

PartitionStatsUtil.sortStats(partitionStats, Partitioning.partitionType(table));
Expand Down
108 changes: 78 additions & 30 deletions core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PartitionMap;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;

Expand All @@ -45,22 +49,53 @@ private PartitionStatsUtil() {}
* @param table the table for which partition stats to be computed.
* @param snapshot the snapshot for which partition stats is computed.
* @return the collection of {@link PartitionStats}
* @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #computeStats(Table, Snapshot,
* Snapshot)} instead.
*/
@Deprecated
public static Collection<PartitionStats> computeStats(Table table, Snapshot snapshot) {
Preconditions.checkArgument(table != null, "table cannot be null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must be partitioned");
Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
return computeStats(table, null, snapshot).values();
}

StructType partitionType = Partitioning.partitionType(table);
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType)));
/**
* Computes the partition stats incrementally after the given snapshot to current snapshot. If the
* given snapshot is null, computes the stats completely instead of incrementally.
*
* @param table the table for which partition stats to be computed.
* @param fromSnapshot the snapshot after which partition stats is computed (exclusive).
* @param currentSnapshot the snapshot till which partition stats is computed (inclusive).
* @return the {@link PartitionMap} of {@link PartitionStats}
*/
public static PartitionMap<PartitionStats> computeStats(
Table table, Snapshot fromSnapshot, Snapshot currentSnapshot) {
Preconditions.checkArgument(table != null, "Table cannot be null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
Preconditions.checkArgument(currentSnapshot != null, "Current snapshot cannot be null");

return mergeStats(statsByManifest, table.specs());
Predicate<ManifestFile> manifestFilePredicate = file -> true;
if (fromSnapshot != null) {
Preconditions.checkArgument(currentSnapshot != fromSnapshot, "Both the snapshots are same");
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), fromSnapshot.snapshotId()),
"Starting snapshot %s is not an ancestor of current snapshot %s",
fromSnapshot.snapshotId(),
currentSnapshot.snapshotId());
Set<Long> snapshotIdsRange =
Sets.newHashSet(
SnapshotUtil.ancestorIdsBetween(
currentSnapshot.snapshotId(), fromSnapshot.snapshotId(), table::snapshot));
manifestFilePredicate =
manifestFile ->
snapshotIdsRange.contains(manifestFile.snapshotId())
&& !manifestFile.hasExistingFiles();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want this as a default predicate?

manifestFile -> !manifestFile.hasExistingFiles()

Copy link
Member

@deniskuzZ deniskuzZ Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could add it as default filter:

if (fromSnapshot != null) {
  manifestFilePredicate =
      manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId())
}
List<ManifestFile> manifests =
  currentSnapshot.allManifests(table.io()).stream()
      .filter(manifestFilePredicate)
      .filter(manifestFile -> !manifestFile.hasExistingFiles())
      .collect(Collectors.toList());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

While computing incremental, I observed that it may become duplicate counts. So, I added.
I do have some gaps, I need to understand fully when and all we mark manifest entry as existing.
Is there any scenario exist to consider "existing" entries or just "added" is enough?

There is another check down below, that considers both added and existing (added long back).

I will update the code to just keep added entry and also add a testcase of rewrite data files to ensure stats are same after the rewrite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, looks like ManifestFile can have both added and existing entries together? So, Instead of filtering here. I will keep filtering just at the entries level down below in collectStatsForManifest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we have compaction and expire snapshots? new manifests would have the EXISTING entries?

Copy link
Contributor

@pvary pvary Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we do with the stats of the removed files?

Lets say:

  • S1 adds data
  • Execute the stats collection
  • S2 adds more data
  • S3 compacts data from S1, and S2 - This removes files created by S1, and S2 and creates new files
  • Execute incremental/normal stats collection

What happens with the stats in this case?

Copy link
Member

@deniskuzZ deniskuzZ Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compaction doesn't remove the data. if we expire S1 and S2 we don't have prev snapshots/stats and start fresh (i.e. full compute)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't expire data, could we detect that S3 is only a compaction commit, and the stats don't need to be changed?

What if S3 instead is a MoW commit? Can we detect the changes and calculate stats incrementally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Compaction will have snapshot operation as REPLACE and we can reuse the old stats for that scenario. But need to write the new stats file with same data to handle clean GC of snapshot files.

Compaction will be tested end to end while adding the spark procedure.

  1. About the live (existing + added),

For full compute, old manifest files will be marked as deleted and entries will be reused as existing in the manifest files + may have additional added entry. So, for full compute need to consider both existing and added.

For incremental compute, old stats file has some entires which are now existing. So, should consider the existing entires.

This all leads to the next question, what happens when manifest is deleted. That case we just update the snapshot entry (last modified) and not decrement the stats. Hence, we should skip it for incremental compute again.

All these logic present in collectStatsForManifest and existing testcases (full compute and incremental) covers it as it uses mergeAppend which produces manifest mix of added and existing entires.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't need decrement stats for full compute because we were discarding the deleted manifests. Only considering live manifests.

Now, I am not really sure for compaction, the current code will work. We may need decrement stats just for incremental compute. I will test compaction scenario tomorrow and handle this.

}

StructType partitionType = Partitioning.partitionType(table);
List<ManifestFile> manifests =
currentSnapshot.allManifests(table.io()).stream()
.filter(manifestFilePredicate)
.collect(Collectors.toList());
return collectStats(table, manifests, partitionType);
}

/**
Expand All @@ -82,6 +117,25 @@ private static Comparator<PartitionStats> partitionStatsCmp(StructType partition
}

private static PartitionMap<PartitionStats> collectStats(
Table table, List<ManifestFile> manifests, StructType partitionType) {
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(
manifest ->
statsByManifest.add(collectStatsForManifest(table, manifest, partitionType)));

PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
for (PartitionMap<PartitionStats> stats : statsByManifest) {
mergePartitionMap(stats, statsMap);
}

return statsMap;
}

private static PartitionMap<PartitionStats> collectStatsForManifest(
Table table, ManifestFile manifest, StructType partitionType) {
try (ManifestReader<?> reader = openManifest(table, manifest)) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
Expand Down Expand Up @@ -118,22 +172,16 @@ private static ManifestReader<?> openManifest(Table table, ManifestFile manifest
return ManifestFiles.open(manifest, table.io()).select(projection);
}

private static Collection<PartitionStats> mergeStats(
Queue<PartitionMap<PartitionStats>> statsByManifest, Map<Integer, PartitionSpec> specs) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(specs);

for (PartitionMap<PartitionStats> stats : statsByManifest) {
stats.forEach(
(key, value) ->
statsMap.merge(
key,
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));
}

return statsMap.values();
private static void mergePartitionMap(
PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats> toMap) {
fromMap.forEach(
(key, value) ->
toMap.merge(
key,
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));
}
}
136 changes: 131 additions & 5 deletions core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class TestPartitionStatsUtil {
public void testPartitionStatsOnEmptyTable() throws Exception {
Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2);
assertThatThrownBy(
() -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()))
() -> PartitionStatsUtil.computeStats(testTable, null, testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("snapshot cannot be null");
.hasMessage("Current snapshot cannot be null");
}

@Test
Expand All @@ -70,9 +70,50 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception {
appendFiles.commit();

assertThatThrownBy(
() -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()))
() -> PartitionStatsUtil.computeStats(testTable, null, testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("table must be partitioned");
.hasMessage("Table must be partitioned");
}

@Test
public void testNonAncestorSnapshot() throws Exception {
Table testTable =
TestTables.create(tempDir("invalid_ancestor"), "invalid_ancestor", SCHEMA, SPEC, 2);

List<DataFile> files = prepareDataFiles(testTable);
AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
Snapshot snapshot1 = testTable.currentSnapshot();

appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
Snapshot snapshot2 = testTable.currentSnapshot();

assertThatThrownBy(() -> PartitionStatsUtil.computeStats(testTable, snapshot2, snapshot1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"Starting snapshot %s is not an ancestor of current snapshot %s",
snapshot2.snapshotId(), snapshot1.snapshotId()));
}

@Test
public void testSameSnapshots() throws Exception {
Table testTable = TestTables.create(tempDir("same_snapshot"), "same_snapshot", SCHEMA, SPEC, 2);

List<DataFile> files = prepareDataFiles(testTable);
AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();

assertThatThrownBy(
() ->
PartitionStatsUtil.computeStats(
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Both the snapshots are same");
}

@Test
Expand Down Expand Up @@ -212,6 +253,87 @@ public void testPartitionStats() throws Exception {
snapshot3.snapshotId()));
}

@Test
public void testPartitionStatsIncrementalCompute() throws Exception {
Table testTable =
TestTables.create(tempDir("compute_incremental"), "compute_incremental", SCHEMA, SPEC, 2);

List<DataFile> files = prepareDataFiles(testTable);
for (int i = 0; i < 3; i++) {
// insert same set of records thrice to have a new manifest files
AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
}

Snapshot snapshotFrom = testTable.currentSnapshot();

AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();

Snapshot currentSnapshot = testTable.currentSnapshot();
Types.StructType partitionType = Partitioning.partitionType(testTable);
Collection<PartitionStats> result =
PartitionStatsUtil.computeStats(testTable, snapshotFrom, testTable.currentSnapshot())
.values();
// should only contain stats from last append (one data file per partition instead of total 4)
validateStats(
result,
Tuple.tuple(
partitionData(partitionType, "foo", "A"),
0,
files.get(0).recordCount(),
1,
files.get(0).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()),
Tuple.tuple(
partitionData(partitionType, "foo", "B"),
0,
files.get(1).recordCount(),
1,
files.get(1).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()),
Tuple.tuple(
partitionData(partitionType, "bar", "A"),
0,
files.get(2).recordCount(),
1,
files.get(2).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()),
Tuple.tuple(
partitionData(partitionType, "bar", "B"),
0,
files.get(3).recordCount(),
1,
files.get(3).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()));
}

@Test
@SuppressWarnings("MethodLength")
public void testPartitionStatsWithSchemaEvolution() throws Exception {
Expand Down Expand Up @@ -559,8 +681,12 @@ private static List<DataFile> prepareDataFilesOnePart(Table table) {
private static void computeAndValidatePartitionStats(Table testTable, Tuple... expectedValues) {
// compute and commit partition stats file
Collection<PartitionStats> result =
PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot());
PartitionStatsUtil.computeStats(testTable, null, testTable.currentSnapshot()).values();

validateStats(result, expectedValues);
}

private static void validateStats(Collection<PartitionStats> result, Tuple... expectedValues) {
assertThat(result)
.extracting(
PartitionStats::partition,
Expand Down
Loading