Skip to content

Commit 5446ee0

Browse files
committed
Core: Support incremental compute for partition stats
1 parent 6779a15 commit 5446ee0

File tree

4 files changed

+438
-67
lines changed

4 files changed

+438
-67
lines changed

core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java

+67-25
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@
2323
import java.util.Collection;
2424
import java.util.Comparator;
2525
import java.util.List;
26-
import java.util.Map;
2726
import java.util.Queue;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
2829
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2930
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3031
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
32+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3133
import org.apache.iceberg.types.Comparators;
3234
import org.apache.iceberg.types.Types.StructType;
3335
import org.apache.iceberg.util.PartitionMap;
3436
import org.apache.iceberg.util.PartitionUtil;
37+
import org.apache.iceberg.util.SnapshotUtil;
3538
import org.apache.iceberg.util.Tasks;
3639
import org.apache.iceberg.util.ThreadPools;
3740

@@ -53,14 +56,40 @@ public static Collection<PartitionStats> computeStats(Table table, Snapshot snap
5356

5457
StructType partitionType = Partitioning.partitionType(table);
5558
List<ManifestFile> manifests = snapshot.allManifests(table.io());
56-
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
57-
Tasks.foreach(manifests)
58-
.stopOnFailure()
59-
.throwFailureWhenFinished()
60-
.executeWith(ThreadPools.getWorkerPool())
61-
.run(manifest -> statsByManifest.add(collectStats(table, manifest, partitionType)));
59+
return collectStats(table, manifests, partitionType).values();
60+
}
61+
62+
/**
63+
* Computes the partition stats incrementally after the given snapshot to current snapshot.
64+
*
65+
* @param table the table for which partition stats to be computed.
66+
* @param afterSnapshot the snapshot after which partition stats is computed (exclusive).
67+
* @param currentSnapshot the snapshot till which partition stats is computed (inclusive).
68+
* @return the {@link PartitionMap} of {@link PartitionStats}
69+
*/
70+
public static PartitionMap<PartitionStats> computeStatsIncremental(
71+
Table table, Snapshot afterSnapshot, Snapshot currentSnapshot) {
72+
Preconditions.checkArgument(table != null, "Table cannot be null");
73+
Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
74+
Preconditions.checkArgument(currentSnapshot != null, "Current snapshot cannot be null");
75+
Preconditions.checkArgument(afterSnapshot != null, "Snapshot cannot be null");
76+
Preconditions.checkArgument(currentSnapshot != afterSnapshot, "Both the snapshots are same");
77+
Preconditions.checkArgument(
78+
SnapshotUtil.isAncestorOf(table, currentSnapshot.snapshotId(), afterSnapshot.snapshotId()),
79+
"Starting snapshot %s is not an ancestor of current snapshot %s",
80+
afterSnapshot.snapshotId(),
81+
currentSnapshot.snapshotId());
6282

63-
return mergeStats(statsByManifest, table.specs());
83+
Set<Long> snapshotIdsRange =
84+
Sets.newHashSet(
85+
SnapshotUtil.ancestorIdsBetween(
86+
currentSnapshot.snapshotId(), afterSnapshot.snapshotId(), table::snapshot));
87+
StructType partitionType = Partitioning.partitionType(table);
88+
List<ManifestFile> manifests =
89+
currentSnapshot.allManifests(table.io()).stream()
90+
.filter(manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId()))
91+
.collect(Collectors.toList());
92+
return collectStats(table, manifests, partitionType);
6493
}
6594

6695
/**
@@ -82,6 +111,25 @@ private static Comparator<PartitionStats> partitionStatsCmp(StructType partition
82111
}
83112

84113
private static PartitionMap<PartitionStats> collectStats(
114+
Table table, List<ManifestFile> manifests, StructType partitionType) {
115+
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
116+
Tasks.foreach(manifests)
117+
.stopOnFailure()
118+
.throwFailureWhenFinished()
119+
.executeWith(ThreadPools.getWorkerPool())
120+
.run(
121+
manifest ->
122+
statsByManifest.add(collectStatsForManifest(table, manifest, partitionType)));
123+
124+
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
125+
for (PartitionMap<PartitionStats> stats : statsByManifest) {
126+
mergePartitionMap(stats, statsMap);
127+
}
128+
129+
return statsMap;
130+
}
131+
132+
private static PartitionMap<PartitionStats> collectStatsForManifest(
85133
Table table, ManifestFile manifest, StructType partitionType) {
86134
try (ManifestReader<?> reader = openManifest(table, manifest)) {
87135
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
@@ -118,22 +166,16 @@ private static ManifestReader<?> openManifest(Table table, ManifestFile manifest
118166
return ManifestFiles.open(manifest, table.io()).select(projection);
119167
}
120168

121-
private static Collection<PartitionStats> mergeStats(
122-
Queue<PartitionMap<PartitionStats>> statsByManifest, Map<Integer, PartitionSpec> specs) {
123-
PartitionMap<PartitionStats> statsMap = PartitionMap.create(specs);
124-
125-
for (PartitionMap<PartitionStats> stats : statsByManifest) {
126-
stats.forEach(
127-
(key, value) ->
128-
statsMap.merge(
129-
key,
130-
value,
131-
(existingEntry, newEntry) -> {
132-
existingEntry.appendStats(newEntry);
133-
return existingEntry;
134-
}));
135-
}
136-
137-
return statsMap.values();
169+
public static void mergePartitionMap(
170+
PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats> toMap) {
171+
fromMap.forEach(
172+
(key, value) ->
173+
toMap.merge(
174+
key,
175+
value,
176+
(existingEntry, newEntry) -> {
177+
existingEntry.appendStats(newEntry);
178+
return existingEntry;
179+
}));
138180
}
139181
}

core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java

+142
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public void testPartitionStatsOnEmptyTable() throws Exception {
5252
() -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()))
5353
.isInstanceOf(IllegalArgumentException.class)
5454
.hasMessage("snapshot cannot be null");
55+
56+
assertThatThrownBy(
57+
() ->
58+
PartitionStatsUtil.computeStatsIncremental(
59+
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
60+
.isInstanceOf(IllegalArgumentException.class)
61+
.hasMessage("Current snapshot cannot be null");
5562
}
5663

5764
@Test
@@ -73,6 +80,55 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception {
7380
() -> PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot()))
7481
.isInstanceOf(IllegalArgumentException.class)
7582
.hasMessage("table must be partitioned");
83+
84+
assertThatThrownBy(
85+
() ->
86+
PartitionStatsUtil.computeStatsIncremental(
87+
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
88+
.isInstanceOf(IllegalArgumentException.class)
89+
.hasMessage("Table must be partitioned");
90+
}
91+
92+
@Test
93+
public void testNonAncestorSnapshot() throws Exception {
94+
Table testTable =
95+
TestTables.create(tempDir("invalid_ancestor"), "invalid_ancestor", SCHEMA, SPEC, 2);
96+
97+
List<DataFile> files = prepareDataFiles(testTable);
98+
AppendFiles appendFiles = testTable.newAppend();
99+
files.forEach(appendFiles::appendFile);
100+
appendFiles.commit();
101+
Snapshot snapshot1 = testTable.currentSnapshot();
102+
103+
appendFiles = testTable.newAppend();
104+
files.forEach(appendFiles::appendFile);
105+
appendFiles.commit();
106+
Snapshot snapshot2 = testTable.currentSnapshot();
107+
108+
assertThatThrownBy(
109+
() -> PartitionStatsUtil.computeStatsIncremental(testTable, snapshot2, snapshot1))
110+
.isInstanceOf(IllegalArgumentException.class)
111+
.hasMessage(
112+
String.format(
113+
"Starting snapshot %s is not an ancestor of current snapshot %s",
114+
snapshot2.snapshotId(), snapshot1.snapshotId()));
115+
}
116+
117+
@Test
118+
public void testSameSnapshots() throws Exception {
119+
Table testTable = TestTables.create(tempDir("same_snapshot"), "same_snapshot", SCHEMA, SPEC, 2);
120+
121+
List<DataFile> files = prepareDataFiles(testTable);
122+
AppendFiles appendFiles = testTable.newAppend();
123+
files.forEach(appendFiles::appendFile);
124+
appendFiles.commit();
125+
126+
assertThatThrownBy(
127+
() ->
128+
PartitionStatsUtil.computeStatsIncremental(
129+
testTable, testTable.currentSnapshot(), testTable.currentSnapshot()))
130+
.isInstanceOf(IllegalArgumentException.class)
131+
.hasMessage("Both the snapshots are same");
76132
}
77133

78134
@Test
@@ -212,6 +268,88 @@ public void testPartitionStats() throws Exception {
212268
snapshot3.snapshotId()));
213269
}
214270

271+
@Test
272+
public void testPartitionStatsIncrementalCompute() throws Exception {
273+
Table testTable =
274+
TestTables.create(tempDir("compute_incremental"), "compute_incremental", SCHEMA, SPEC, 2);
275+
276+
List<DataFile> files = prepareDataFiles(testTable);
277+
for (int i = 0; i < 3; i++) {
278+
// insert same set of records thrice to have a new manifest files
279+
AppendFiles appendFiles = testTable.newAppend();
280+
files.forEach(appendFiles::appendFile);
281+
appendFiles.commit();
282+
}
283+
284+
Snapshot snapshotFrom = testTable.currentSnapshot();
285+
286+
AppendFiles appendFiles = testTable.newAppend();
287+
files.forEach(appendFiles::appendFile);
288+
appendFiles.commit();
289+
290+
Snapshot currentSnapshot = testTable.currentSnapshot();
291+
Types.StructType partitionType = Partitioning.partitionType(testTable);
292+
Collection<PartitionStats> result =
293+
PartitionStatsUtil.computeStatsIncremental(
294+
testTable, snapshotFrom, testTable.currentSnapshot())
295+
.values();
296+
// should only contain stats from last append (one data file per partition instead of total 4)
297+
validateStats(
298+
result,
299+
Tuple.tuple(
300+
partitionData(partitionType, "foo", "A"),
301+
0,
302+
files.get(0).recordCount(),
303+
1,
304+
files.get(0).fileSizeInBytes(),
305+
0L,
306+
0,
307+
0L,
308+
0,
309+
null,
310+
currentSnapshot.timestampMillis(),
311+
currentSnapshot.snapshotId()),
312+
Tuple.tuple(
313+
partitionData(partitionType, "foo", "B"),
314+
0,
315+
files.get(1).recordCount(),
316+
1,
317+
files.get(1).fileSizeInBytes(),
318+
0L,
319+
0,
320+
0L,
321+
0,
322+
null,
323+
currentSnapshot.timestampMillis(),
324+
currentSnapshot.snapshotId()),
325+
Tuple.tuple(
326+
partitionData(partitionType, "bar", "A"),
327+
0,
328+
files.get(2).recordCount(),
329+
1,
330+
files.get(2).fileSizeInBytes(),
331+
0L,
332+
0,
333+
0L,
334+
0,
335+
null,
336+
currentSnapshot.timestampMillis(),
337+
currentSnapshot.snapshotId()),
338+
Tuple.tuple(
339+
partitionData(partitionType, "bar", "B"),
340+
0,
341+
files.get(3).recordCount(),
342+
1,
343+
files.get(3).fileSizeInBytes(),
344+
0L,
345+
0,
346+
0L,
347+
0,
348+
null,
349+
currentSnapshot.timestampMillis(),
350+
currentSnapshot.snapshotId()));
351+
}
352+
215353
@Test
216354
@SuppressWarnings("MethodLength")
217355
public void testPartitionStatsWithSchemaEvolution() throws Exception {
@@ -561,6 +699,10 @@ private static void computeAndValidatePartitionStats(Table testTable, Tuple... e
561699
Collection<PartitionStats> result =
562700
PartitionStatsUtil.computeStats(testTable, testTable.currentSnapshot());
563701

702+
validateStats(result, expectedValues);
703+
}
704+
705+
private static void validateStats(Collection<PartitionStats> result, Tuple... expectedValues) {
564706
assertThat(result)
565707
.extracting(
566708
PartitionStats::partition,

0 commit comments

Comments
 (0)