Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Support incremental compute for partition stats #12629

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 70 additions & 25 deletions core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PartitionMap;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;

Expand All @@ -53,14 +56,43 @@ public static Collection<PartitionStats> computeStats(Table table, Snapshot snap

StructType partitionType = Partitioning.partitionType(table);
List<ManifestFile> manifests = snapshot.allManifests(table.io());
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType)));
return collectStats(table, manifests, partitionType).values();
}

/**
* Computes the partition stats incrementally after the given snapshot to current snapshot.
*
* @param table the table for which partition stats to be computed.
* @param afterSnapshot the snapshot after which partition stats is computed (exclusive).
* @param currentSnapshot the snapshot till which partition stats is computed (inclusive).
* @return the {@link PartitionMap} of {@link PartitionStats}
*/
public static PartitionMap<PartitionStats> computeStatsIncremental(
Table table, Snapshot afterSnapshot, Snapshot currentSnapshot) {
Preconditions.checkArgument(table != null, "Table cannot be null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
Preconditions.checkArgument(currentSnapshot != null, "Current snapshot cannot be null");
Preconditions.checkArgument(afterSnapshot != null, "Snapshot cannot be null");
Preconditions.checkArgument(currentSnapshot != afterSnapshot, "Both the snapshots are same");
Preconditions.checkArgument(
SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), afterSnapshot.snapshotId()),
"Starting snapshot %s is not an ancestor of current snapshot %s",
afterSnapshot.snapshotId(),
currentSnapshot.snapshotId());

return mergeStats(statsByManifest, table.specs());
Set<Long> snapshotIdsRange =
Sets.newHashSet(
SnapshotUtil.ancestorIdsBetween(
currentSnapshot.snapshotId(), afterSnapshot.snapshotId(), table::snapshot));
StructType partitionType = Partitioning.partitionType(table);
List<ManifestFile> manifests =
currentSnapshot.allManifests(table.io()).stream()
.filter(
manifestFile ->
snapshotIdsRange.contains(manifestFile.snapshotId())
&& !manifestFile.hasExistingFiles())
.collect(Collectors.toList());
return collectStats(table, manifests, partitionType);
}

/**
Expand All @@ -82,6 +114,25 @@ private static Comparator<PartitionStats> partitionStatsCmp(StructType partition
}

private static PartitionMap<PartitionStats> collectStats(
Table table, List<ManifestFile> manifests, StructType partitionType) {
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(
manifest ->
statsByManifest.add(collectStatsForManifest(table, manifest, partitionType)));

PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
for (PartitionMap<PartitionStats> stats : statsByManifest) {
mergePartitionMap(stats, statsMap);
}

return statsMap;
}

private static PartitionMap<PartitionStats> collectStatsForManifest(
Table table, ManifestFile manifest, StructType partitionType) {
try (ManifestReader<?> reader = openManifest(table, manifest)) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
Expand Down Expand Up @@ -118,22 +169,16 @@ private static ManifestReader<?> openManifest(Table table, ManifestFile manifest
return ManifestFiles.open(manifest, table.io()).select(projection);
}

private static Collection<PartitionStats> mergeStats(
Queue<PartitionMap<PartitionStats>> statsByManifest, Map<Integer, PartitionSpec> specs) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(specs);

for (PartitionMap<PartitionStats> stats : statsByManifest) {
stats.forEach(
(key, value) ->
statsMap.merge(
key,
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));
}

return statsMap.values();
public static void mergePartitionMap(
PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats> toMap) {
fromMap.forEach(
(key, value) ->
toMap.merge(
key,
value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
return existingEntry;
}));
}
}
142 changes: 142 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public void testPartitionStatsOnEmptyTable() throws Exception {
() -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("snapshot cannot be null");

assertThatThrownBy(
() ->
PartitionStatsUtil.computeStatsIncremental(
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Current snapshot cannot be null");
}

@Test
Expand All @@ -73,6 +80,55 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception {
() -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("table must be partitioned");

assertThatThrownBy(
() ->
PartitionStatsUtil.computeStatsIncremental(
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table must be partitioned");
}

@Test
public void testNonAncestorSnapshot() throws Exception {
Table testTable =
TestTables.create(tempDir("invalid_ancestor"), "invalid_ancestor", SCHEMA, SPEC, 2);

List<DataFile> files = prepareDataFiles(testTable);
AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
Snapshot snapshot1 = testTable.currentSnapshot();

appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
Snapshot snapshot2 = testTable.currentSnapshot();

assertThatThrownBy(
() -> PartitionStatsUtil.computeStatsIncremental(testTable, snapshot2, snapshot1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"Starting snapshot %s is not an ancestor of current snapshot %s",
snapshot2.snapshotId(), snapshot1.snapshotId()));
}

@Test
public void testSameSnapshots() throws Exception {
Table testTable = TestTables.create(tempDir("same_snapshot"), "same_snapshot", SCHEMA, SPEC, 2);

List<DataFile> files = prepareDataFiles(testTable);
AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();

assertThatThrownBy(
() ->
PartitionStatsUtil.computeStatsIncremental(
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Both the snapshots are same");
}

@Test
Expand Down Expand Up @@ -212,6 +268,88 @@ public void testPartitionStats() throws Exception {
snapshot3.snapshotId()));
}

@Test
public void testPartitionStatsIncrementalCompute() throws Exception {
Table testTable =
TestTables.create(tempDir("compute_incremental"), "compute_incremental", SCHEMA, SPEC, 2);

List<DataFile> files = prepareDataFiles(testTable);
for (int i = 0; i < 3; i++) {
// insert same set of records thrice to have a new manifest files
AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
}

Snapshot snapshotFrom = testTable.currentSnapshot();

AppendFiles appendFiles = testTable.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();

Snapshot currentSnapshot = testTable.currentSnapshot();
Types.StructType partitionType = Partitioning.partitionType(testTable);
Collection<PartitionStats> result =
PartitionStatsUtil.computeStatsIncremental(
testTable, snapshotFrom, testTable.currentSnapshot())
.values();
// should only contain stats from last append (one data file per partition instead of total 4)
validateStats(
result,
Tuple.tuple(
partitionData(partitionType, "foo", "A"),
0,
files.get(0).recordCount(),
1,
files.get(0).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()),
Tuple.tuple(
partitionData(partitionType, "foo", "B"),
0,
files.get(1).recordCount(),
1,
files.get(1).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()),
Tuple.tuple(
partitionData(partitionType, "bar", "A"),
0,
files.get(2).recordCount(),
1,
files.get(2).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()),
Tuple.tuple(
partitionData(partitionType, "bar", "B"),
0,
files.get(3).recordCount(),
1,
files.get(3).fileSizeInBytes(),
0L,
0,
0L,
0,
null,
currentSnapshot.timestampMillis(),
currentSnapshot.snapshotId()));
}

@Test
@SuppressWarnings("MethodLength")
public void testPartitionStatsWithSchemaEvolution() throws Exception {
Expand Down Expand Up @@ -561,6 +699,10 @@ private static void computeAndValidatePartitionStats(Table testTable, Tuple... e
Collection<PartitionStats> result =
PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot());

validateStats(result, expectedValues);
}

private static void validateStats(Collection<PartitionStats> result, Tuple... expectedValues) {
assertThat(result)
.extracting(
PartitionStats::partition,
Expand Down
Loading