Skip to content

Commit 684f7a7

Browse files
authored
Core, Spark 3.5: Read deletes in parallel and cache them on executors (#8755)
1 parent bb50ab9 commit 684f7a7

File tree

20 files changed

+1460
-87
lines changed

20 files changed

+1460
-87
lines changed

api/src/main/java/org/apache/iceberg/types/TypeUtil.java

+64
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
public class TypeUtil {
4242

43+
private static final int HEADER_SIZE = 12;
44+
4345
private TypeUtil() {}
4446

4547
/**
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
452454
}
453455
}
454456

457+
/**
458+
* Estimates the number of bytes a value for a given field may occupy in memory.
459+
*
460+
* <p>This method approximates the memory size based on heuristics and the internal Java
461+
* representation defined by {@link Type.TypeID}. It is important to note that the actual size
462+
* might differ from this estimation. The method is designed to handle a variety of data types,
463+
* including primitive types, strings, and nested types such as structs, maps, and lists.
464+
*
465+
* @param field a field for which to estimate the size
466+
* @return the estimated size in bytes of the field's value in memory
467+
*/
468+
public static int estimateSize(Types.NestedField field) {
469+
return estimateSize(field.type());
470+
}
471+
472+
private static int estimateSize(Type type) {
473+
switch (type.typeId()) {
474+
case BOOLEAN:
475+
// the size of a boolean variable is virtual machine dependent
476+
// it is common to believe booleans occupy 1 byte in most JVMs
477+
return 1;
478+
case INTEGER:
479+
case FLOAT:
480+
case DATE:
481+
// ints and floats occupy 4 bytes
482+
// dates are internally represented as ints
483+
return 4;
484+
case LONG:
485+
case DOUBLE:
486+
case TIME:
487+
case TIMESTAMP:
488+
// longs and doubles occupy 8 bytes
489+
// times and timestamps are internally represented as longs
490+
return 8;
491+
case STRING:
492+
// 12 (header) + 6 (fields) + 16 (array overhead) + 20 (10 chars, 2 bytes each) = 54 bytes
493+
return 54;
494+
case UUID:
495+
// 12 (header) + 16 (two long variables) = 28 bytes
496+
return 28;
497+
case FIXED:
498+
return ((Types.FixedType) type).length();
499+
case BINARY:
500+
return 80;
501+
case DECIMAL:
502+
// 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes
503+
return 44;
504+
case STRUCT:
505+
Types.StructType struct = (Types.StructType) type;
506+
return HEADER_SIZE + struct.fields().stream().mapToInt(TypeUtil::estimateSize).sum();
507+
case LIST:
508+
Types.ListType list = (Types.ListType) type;
509+
return HEADER_SIZE + 5 * estimateSize(list.elementType());
510+
case MAP:
511+
Types.MapType map = (Types.MapType) type;
512+
int entrySize = HEADER_SIZE + estimateSize(map.keyType()) + estimateSize(map.valueType());
513+
return HEADER_SIZE + 5 * entrySize;
514+
default:
515+
return 16;
516+
}
517+
}
518+
455519
/** Interface for passing a function that assigns column IDs. */
456520
public interface NextID {
457521
int get();

core/src/main/java/org/apache/iceberg/SystemConfigs.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ private SystemConfigs() {}
4343
Integer::parseUnsignedInt);
4444

4545
/**
46-
* Sets the size of the delete worker pool. This limits the number of threads used to compute the
47-
* PositionDeleteIndex from the position deletes for a data file.
46+
* Sets the size of the delete worker pool. This limits the number of threads used to read delete
47+
* files for a data file.
4848
*/
4949
public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
5050
new ConfigEntry<>(
5151
"iceberg.worker.delete-num-threads",
5252
"ICEBERG_WORKER_DELETE_NUM_THREADS",
53-
Math.max(2, Runtime.getRuntime().availableProcessors()),
53+
Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
5454
Integer::parseUnsignedInt);
5555

5656
/** Whether to use the shared worker pool when planning table scans. */

core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java

+4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex {
2727
roaring64Bitmap = new Roaring64Bitmap();
2828
}
2929

30+
void merge(BitmapPositionDeleteIndex that) {
31+
roaring64Bitmap.or(that.roaring64Bitmap);
32+
}
33+
3034
@Override
3135
public void delete(long position) {
3236
roaring64Bitmap.add(position);

core/src/main/java/org/apache/iceberg/deletes/Deletes.java

+30
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
3838
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3939
import org.apache.iceberg.types.Types;
40+
import org.apache.iceberg.util.CharSequenceMap;
4041
import org.apache.iceberg.util.Filter;
4142
import org.apache.iceberg.util.ParallelIterable;
4243
import org.apache.iceberg.util.SortedMerge;
@@ -128,6 +129,35 @@ public static StructLikeSet toEqualitySet(
128129
}
129130
}
130131

132+
/**
133+
* Builds a map of position delete indexes by path.
134+
*
135+
* <p>Unlike {@link #toPositionIndex(CharSequence, List)}, this method builds a position delete
136+
* index for each referenced data file and does not filter deletes. This can be useful when the
137+
* entire delete file content is needed (e.g. caching).
138+
*
139+
* @param posDeletes position deletes
140+
* @return the map of position delete indexes by path
141+
*/
142+
public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPositionIndexes(
143+
CloseableIterable<T> posDeletes) {
144+
CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();
145+
146+
try (CloseableIterable<T> deletes = posDeletes) {
147+
for (T delete : deletes) {
148+
CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
149+
long position = (long) POSITION_ACCESSOR.get(delete);
150+
PositionDeleteIndex index =
151+
indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex());
152+
index.delete(position);
153+
}
154+
} catch (IOException e) {
155+
throw new UncheckedIOException("Failed to close position delete source", e);
156+
}
157+
158+
return indexes;
159+
}
160+
131161
public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
132162
CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
133163
return toPositionIndex(dataLocation, deleteFiles, ThreadPools.getDeleteWorkerPool());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.deletes;
20+
21+
class EmptyPositionDeleteIndex implements PositionDeleteIndex {
22+
23+
private static final EmptyPositionDeleteIndex INSTANCE = new EmptyPositionDeleteIndex();
24+
25+
private EmptyPositionDeleteIndex() {}
26+
27+
static EmptyPositionDeleteIndex get() {
28+
return INSTANCE;
29+
}
30+
31+
@Override
32+
public void delete(long position) {
33+
throw new UnsupportedOperationException("Cannot modify " + getClass().getName());
34+
}
35+
36+
@Override
37+
public void delete(long posStart, long posEnd) {
38+
throw new UnsupportedOperationException("Cannot modify " + getClass().getName());
39+
}
40+
41+
@Override
42+
public boolean isDeleted(long position) {
43+
return false;
44+
}
45+
46+
@Override
47+
public boolean isEmpty() {
48+
return true;
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return "PositionDeleteIndex{}";
54+
}
55+
}

core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java

+10
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,14 @@ public interface PositionDeleteIndex {
4444

4545
/** Returns true if this collection contains no element. */
4646
boolean isEmpty();
47+
48+
/** Returns true if this collection contains elements. */
49+
default boolean isNotEmpty() {
50+
return !isEmpty();
51+
}
52+
53+
/** Returns an empty immutable position delete index. */
54+
static PositionDeleteIndex empty() {
55+
return EmptyPositionDeleteIndex.get();
56+
}
4757
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.deletes;
20+
21+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
22+
23+
public class PositionDeleteIndexUtil {
24+
25+
private PositionDeleteIndexUtil() {}
26+
27+
public static PositionDeleteIndex merge(Iterable<? extends PositionDeleteIndex> indexes) {
28+
BitmapPositionDeleteIndex result = new BitmapPositionDeleteIndex();
29+
30+
for (PositionDeleteIndex index : indexes) {
31+
if (index.isNotEmpty()) {
32+
Preconditions.checkArgument(
33+
index instanceof BitmapPositionDeleteIndex,
34+
"Can merge only bitmap-based indexes, got %s",
35+
index.getClass().getName());
36+
result.merge((BitmapPositionDeleteIndex) index);
37+
}
38+
}
39+
40+
return result;
41+
}
42+
}

core/src/main/java/org/apache/iceberg/util/ThreadPools.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public static ExecutorService getWorkerPool() {
6868
/**
6969
* Return an {@link ExecutorService} that uses the "delete worker" thread-pool.
7070
*
71-
* <p>The size of the delete worker pool limits the number of threads used to compute the
72-
* PositionDeleteIndex from the position deletes for a data file.
71+
* <p>The size of this worker pool limits the number of tasks concurrently reading delete files
72+
* within a single JVM. If there are multiple threads loading deletes, all of them will share this
73+
* worker pool by default.
7374
*
7475
* <p>The size of this thread-pool is controlled by the Java system property {@code
7576
* iceberg.worker.delete-num-threads}.

0 commit comments

Comments
 (0)