Skip to content

Commit c200d06

Browse files
committed
Improve PageProcessor retained bytes calculations
Ensures that all components of SourcePage implementations are accounted for in their retained bytes and retained bytes for each parts calculations to avoid under-counting the amount of memory being retained. Also ensures that PageProcessor selected positions and previouslyComputedResults array is included in the retained size.
1 parent 8ff6929 commit c200d06

File tree

15 files changed

+203
-29
lines changed

15 files changed

+203
-29
lines changed

Diff for: core/trino-main/src/main/java/io/trino/operator/project/InputChannels.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.function.ObjLongConsumer;
2727

2828
import static com.google.common.base.MoreObjects.toStringHelper;
29+
import static io.airlift.slice.SizeOf.instanceSize;
30+
import static io.airlift.slice.SizeOf.sizeOf;
2931
import static java.util.Objects.requireNonNull;
3032

3133
public class InputChannels
@@ -81,7 +83,11 @@ public String toString()
8183
private static final class InputChannelsSourcePage
8284
implements SourcePage
8385
{
86+
private static final long INSTANCE_SIZE = instanceSize(InputChannelsSourcePage.class);
87+
8488
private final SourcePage sourcePage;
89+
// channels is not considered retained by this class since it is shared between instances created by
90+
// the same outer InputChannels instance
8591
private final int[] channels;
8692
private final Block[] blocks;
8793

@@ -118,17 +124,22 @@ public long getSizeInBytes()
118124
@Override
119125
public long getRetainedSizeInBytes()
120126
{
121-
return sourcePage.getRetainedSizeInBytes();
127+
return INSTANCE_SIZE +
128+
sizeOf(blocks) +
129+
sourcePage.getRetainedSizeInBytes();
122130
}
123131

124132
@Override
125133
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
126134
{
135+
consumer.accept(this, INSTANCE_SIZE);
136+
consumer.accept(blocks, sizeOf(blocks));
127137
for (Block block : blocks) {
128138
if (block != null) {
129139
block.retainedBytesForEachPart(consumer);
130140
}
131141
}
142+
sourcePage.retainedBytesForEachPart(consumer);
132143
}
133144

134145
@Override

Diff for: core/trino-main/src/main/java/io/trino/operator/project/PageProcessor.java

+36-15
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,14 @@
3838
import java.util.Optional;
3939
import java.util.OptionalInt;
4040
import java.util.function.Function;
41+
import java.util.function.ObjLongConsumer;
4142

4243
import static com.google.common.base.Preconditions.checkArgument;
4344
import static com.google.common.base.Verify.verify;
4445
import static com.google.common.base.Verify.verifyNotNull;
4546
import static com.google.common.collect.ImmutableList.toImmutableList;
47+
import static io.airlift.slice.SizeOf.instanceSize;
48+
import static io.airlift.slice.SizeOf.sizeOf;
4649
import static io.trino.operator.WorkProcessor.ProcessState.finished;
4750
import static io.trino.operator.WorkProcessor.ProcessState.ofResult;
4851
import static io.trino.operator.WorkProcessor.ProcessState.yielded;
@@ -143,6 +146,8 @@ public WorkProcessor<Page> createWorkProcessor(
143146
private class ProjectSelectedPositions
144147
implements WorkProcessor.Process<Page>
145148
{
149+
private static final long INSTANCE_SIZE = instanceSize(ProjectSelectedPositions.class);
150+
146151
private final ConnectorSession session;
147152
private final DriverYieldSignal yieldSignal;
148153
private final LocalMemoryContext memoryContext;
@@ -259,29 +264,45 @@ private void updateBatchSize(int positionCount, long pageSize)
259264

260265
private void updateRetainedSize()
261266
{
262-
// TODO: This is an estimate without knowing anything about the SourcePage implementation details. SourcePage
263-
// should expose this information directly
264-
retainedSizeInBytes = Page.getInstanceSizeInBytes(page.getChannelCount());
265-
// increment the size only when it is the first reference
266-
ReferenceCountMap referenceCountMap = new ReferenceCountMap();
267-
page.retainedBytesForEachPart((object, size) -> {
268-
if (referenceCountMap.incrementAndGet(object) == 1) {
269-
retainedSizeInBytes += size;
270-
}
271-
});
267+
RetainedBytesByPartVisitor visitor = new RetainedBytesByPartVisitor();
268+
269+
page.retainedBytesForEachPart(visitor);
270+
272271
for (Block previouslyComputedResult : previouslyComputedResults) {
273272
if (previouslyComputedResult != null) {
274-
previouslyComputedResult.retainedBytesForEachPart((object, size) -> {
275-
if (referenceCountMap.incrementAndGet(object) == 1) {
276-
retainedSizeInBytes += size;
277-
}
278-
});
273+
previouslyComputedResult.retainedBytesForEachPart(visitor);
279274
}
280275
}
281276

277+
retainedSizeInBytes = INSTANCE_SIZE +
278+
selectedPositions.getRetainedSizeInBytes() +
279+
sizeOf(previouslyComputedResults) +
280+
visitor.getRetainedSizeInBytes();
281+
282282
memoryContext.setBytes(retainedSizeInBytes);
283283
}
284284

285+
private static final class RetainedBytesByPartVisitor
286+
implements ObjLongConsumer<Object>
287+
{
288+
private final ReferenceCountMap referenceCountMap = new ReferenceCountMap();
289+
private long retainedSizeInBytes;
290+
291+
public long getRetainedSizeInBytes()
292+
{
293+
return retainedSizeInBytes;
294+
}
295+
296+
@Override
297+
public void accept(Object object, long size)
298+
{
299+
// increment the size only when it is the first reference
300+
if (referenceCountMap.incrementAndGet(object) == 1) {
301+
retainedSizeInBytes += size;
302+
}
303+
}
304+
}
305+
285306
private ProcessBatchResult processBatch(int batchSize)
286307
{
287308
Block[] blocks = new Block[projections.size()];

Diff for: core/trino-main/src/main/java/io/trino/operator/project/SelectedPositions.java

+8
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import static com.google.common.base.Preconditions.checkPositionIndexes;
1818
import static com.google.common.base.Preconditions.checkState;
1919
import static com.google.common.base.Verify.verify;
20+
import static io.airlift.slice.SizeOf.instanceSize;
21+
import static io.airlift.slice.SizeOf.sizeOf;
2022
import static java.lang.System.arraycopy;
2123
import static java.util.Objects.checkFromIndexSize;
2224
import static java.util.Objects.requireNonNull;
2325

2426
public class SelectedPositions
2527
{
28+
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);
2629
private static final SelectedPositions EMPTY = positionsRange(0, 0);
2730

2831
private final boolean isList;
@@ -54,6 +57,11 @@ private SelectedPositions(boolean isList, int[] positions, int offset, int size)
5457
}
5558
}
5659

60+
public long getRetainedSizeInBytes()
61+
{
62+
return INSTANCE_SIZE + sizeOf(positions);
63+
}
64+
5765
public boolean isList()
5866
{
5967
return isList;

Diff for: core/trino-main/src/test/java/io/trino/operator/TestingSourcePage.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121
import java.util.function.ObjLongConsumer;
2222

2323
import static com.google.common.base.Preconditions.checkArgument;
24+
import static io.airlift.slice.SizeOf.instanceSize;
25+
import static io.airlift.slice.SizeOf.sizeOf;
2426
import static java.util.Objects.requireNonNull;
2527

2628
public class TestingSourcePage
2729
implements SourcePage
2830
{
31+
private static final long INSTANCE_SIZE = instanceSize(TestingSourcePage.class);
32+
2933
private final int positionCount;
3034
private final Block[] blocks;
3135
private final boolean[] loaded;
@@ -59,7 +63,9 @@ public long getSizeInBytes()
5963
@Override
6064
public long getRetainedSizeInBytes()
6165
{
62-
long retainedSizeInBytes = 0;
66+
long retainedSizeInBytes = INSTANCE_SIZE +
67+
sizeOf(blocks) +
68+
sizeOf(loaded);
6369
for (Block block : blocks) {
6470
if (block != null) {
6571
retainedSizeInBytes += block.getRetainedSizeInBytes();
@@ -71,6 +77,9 @@ public long getRetainedSizeInBytes()
7177
@Override
7278
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
7379
{
80+
consumer.accept(this, INSTANCE_SIZE);
81+
consumer.accept(blocks, sizeOf(blocks));
82+
consumer.accept(loaded, sizeOf(loaded));
7483
for (Block block : blocks) {
7584
if (block != null) {
7685
block.retainedBytesForEachPart(consumer);

Diff for: core/trino-spi/src/main/java/io/trino/spi/connector/FixedSourcePage.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import java.util.function.ObjLongConsumer;
2020

21+
import static io.airlift.slice.SizeOf.instanceSize;
2122
import static java.util.Objects.requireNonNull;
2223

2324
final class FixedSourcePage
2425
implements SourcePage
2526
{
27+
private static final long INSTANCE_SIZE = instanceSize(FixedSourcePage.class);
28+
2629
private Page page;
2730

2831
FixedSourcePage(Page page)
@@ -46,12 +49,14 @@ public long getSizeInBytes()
4649
@Override
4750
public long getRetainedSizeInBytes()
4851
{
49-
return page.getRetainedSizeInBytes();
52+
return INSTANCE_SIZE + page.getRetainedSizeInBytes();
5053
}
5154

5255
@Override
5356
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
5457
{
58+
consumer.accept(this, INSTANCE_SIZE);
59+
consumer.accept(page, Page.getInstanceSizeInBytes(page.getChannelCount()));
5560
for (int i = 0; i < page.getChannelCount(); i++) {
5661
page.getBlock(i).retainedBytesForEachPart(consumer);
5762
}

Diff for: core/trino-spi/src/main/java/io/trino/spi/connector/PositionCountSourcePage.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
import java.util.Objects;
2020
import java.util.function.ObjLongConsumer;
2121

22+
import static io.airlift.slice.SizeOf.instanceSize;
23+
2224
final class PositionCountSourcePage
2325
implements SourcePage
2426
{
27+
private static final long INSTANCE_SIZE = instanceSize(PositionCountSourcePage.class);
28+
2529
private int positionCount;
2630

2731
PositionCountSourcePage(int positionCount)
@@ -47,11 +51,14 @@ public long getSizeInBytes()
4751
@Override
4852
public long getRetainedSizeInBytes()
4953
{
50-
return 0;
54+
return INSTANCE_SIZE;
5155
}
5256

5357
@Override
54-
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer) {}
58+
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
59+
{
60+
consumer.accept(this, INSTANCE_SIZE);
61+
}
5562

5663
@Override
5764
public int getChannelCount()

Diff for: lib/trino-array/src/main/java/io/trino/array/ReferenceCountMap.java

+8
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
import io.airlift.slice.SizeOf;
1717
import io.airlift.slice.Slice;
18+
import io.trino.spi.Page;
1819
import io.trino.spi.block.Block;
1920
import io.trino.spi.block.MapHashTables;
21+
import io.trino.spi.connector.SourcePage;
2022
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
2123

2224
import static io.airlift.slice.SizeOf.instanceSize;
@@ -88,6 +90,12 @@ else if (key instanceof Block block) {
8890
else if (key instanceof Slice slice) {
8991
extraIdentity = (int) slice.getRetainedSize();
9092
}
93+
else if (key instanceof SourcePage sourcePage) {
94+
extraIdentity = (int) sourcePage.getRetainedSizeInBytes();
95+
}
96+
else if (key instanceof Page page) {
97+
extraIdentity = (int) page.getRetainedSizeInBytes();
98+
}
9199
else if (key.getClass().isArray()) {
92100
extraIdentity = getLength(key);
93101
}

Diff for: lib/trino-orc/src/main/java/io/trino/orc/OrcRecordReader.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import static com.google.common.base.Preconditions.checkArgument;
6767
import static com.google.common.base.Preconditions.checkState;
6868
import static io.airlift.slice.SizeOf.instanceSize;
69+
import static io.airlift.slice.SizeOf.sizeOf;
6970
import static io.trino.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
7071
import static io.trino.orc.OrcReader.BATCH_SIZE_GROWTH_FACTOR;
7172
import static io.trino.orc.OrcReader.MAX_BATCH_SIZE;
@@ -467,6 +468,8 @@ public SourcePage nextPage()
467468
private class OrcSourcePage
468469
implements SourcePage
469470
{
471+
private static final long INSTANCE_SIZE = instanceSize(OrcSourcePage.class);
472+
470473
private final int expectedPageId = currentPageId;
471474
private final Block[] blocks = new Block[columnReaders.length + (appendRowNumberColumn ? 1 : 0)];
472475
private final int rowNumberColumnIndex = appendRowNumberColumn ? columnReaders.length : -1;
@@ -478,6 +481,7 @@ private class OrcSourcePage
478481
public OrcSourcePage(int positionCount)
479482
{
480483
selectedPositions = new SelectedPositions(positionCount, null);
484+
retainedSizeInBytes = shallowRetainedSizeInBytes();
481485
}
482486

483487
@Override
@@ -498,9 +502,19 @@ public long getRetainedSizeInBytes()
498502
return retainedSizeInBytes;
499503
}
500504

505+
private long shallowRetainedSizeInBytes()
506+
{
507+
return INSTANCE_SIZE +
508+
sizeOf(blocks) +
509+
selectedPositions.retainedSizeInBytes();
510+
}
511+
501512
@Override
502513
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
503514
{
515+
consumer.accept(this, INSTANCE_SIZE);
516+
consumer.accept(blocks, sizeOf(blocks));
517+
consumer.accept(selectedPositions, selectedPositions.retainedSizeInBytes());
504518
for (Block block : blocks) {
505519
if (block != null) {
506520
block.retainedBytesForEachPart(consumer);
@@ -557,7 +571,7 @@ public Page getPage()
557571
public void selectPositions(int[] positions, int offset, int size)
558572
{
559573
selectedPositions = selectedPositions.selectPositions(positions, offset, size);
560-
retainedSizeInBytes = 0;
574+
retainedSizeInBytes = shallowRetainedSizeInBytes();
561575
for (int i = 0; i < blocks.length; i++) {
562576
Block block = blocks[i];
563577
if (block != null) {
@@ -571,6 +585,13 @@ public void selectPositions(int[] positions, int offset, int size)
571585

572586
private record SelectedPositions(int positionCount, @Nullable int[] positions)
573587
{
588+
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);
589+
590+
public long retainedSizeInBytes()
591+
{
592+
return INSTANCE_SIZE + sizeOf(positions);
593+
}
594+
574595
@CheckReturnValue
575596
public Block apply(Block block)
576597
{

0 commit comments

Comments
 (0)