Skip to content

Commit c0e0648

Browse files
committed
Fix PageProcessor retained bytes calculations
Ensures that all components of SourcePage implementations are accounted for in their retained bytes and retained bytes for each parts calculations to avoid under-counting the amount of memory being retained. Also ensures that PageProcessor selected positions and previouslyComputedResults array is included in the retained size.
1 parent e3e6e6f commit c0e0648

File tree

14 files changed

+166
-17
lines changed

14 files changed

+166
-17
lines changed

Diff for: core/trino-main/src/main/java/io/trino/operator/project/InputChannels.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.function.ObjLongConsumer;
2727

2828
import static com.google.common.base.MoreObjects.toStringHelper;
29+
import static io.airlift.slice.SizeOf.instanceSize;
30+
import static io.airlift.slice.SizeOf.sizeOf;
2931
import static java.util.Objects.requireNonNull;
3032

3133
public class InputChannels
@@ -81,7 +83,11 @@ public String toString()
8183
private static final class InputChannelsSourcePage
8284
implements SourcePage
8385
{
86+
private static final long INSTANCE_SIZE = instanceSize(InputChannelsSourcePage.class);
87+
8488
private final SourcePage sourcePage;
89+
// channels is not considered retained by this class since it is shared between instances created by
90+
// the same outer InputChannels instance
8591
private final int[] channels;
8692
private final Block[] blocks;
8793

@@ -118,17 +124,22 @@ public long getSizeInBytes()
118124
@Override
119125
public long getRetainedSizeInBytes()
120126
{
121-
return sourcePage.getRetainedSizeInBytes();
127+
return INSTANCE_SIZE +
128+
sizeOf(blocks) +
129+
sourcePage.getRetainedSizeInBytes();
122130
}
123131

124132
@Override
125133
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
126134
{
135+
consumer.accept(this, INSTANCE_SIZE);
136+
consumer.accept(blocks, sizeOf(blocks));
127137
for (Block block : blocks) {
128138
if (block != null) {
129139
block.retainedBytesForEachPart(consumer);
130140
}
131141
}
142+
sourcePage.retainedBytesForEachPart(consumer);
132143
}
133144

134145
@Override

Diff for: core/trino-main/src/main/java/io/trino/operator/project/PageProcessor.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import static com.google.common.base.Verify.verify;
4444
import static com.google.common.base.Verify.verifyNotNull;
4545
import static com.google.common.collect.ImmutableList.toImmutableList;
46+
import static io.airlift.slice.SizeOf.instanceSize;
47+
import static io.airlift.slice.SizeOf.sizeOf;
4648
import static io.trino.operator.WorkProcessor.ProcessState.finished;
4749
import static io.trino.operator.WorkProcessor.ProcessState.ofResult;
4850
import static io.trino.operator.WorkProcessor.ProcessState.yielded;
@@ -143,6 +145,8 @@ public WorkProcessor<Page> createWorkProcessor(
143145
private class ProjectSelectedPositions
144146
implements WorkProcessor.Process<Page>
145147
{
148+
private static final long INSTANCE_SIZE = instanceSize(ProjectSelectedPositions.class);
149+
146150
private final ConnectorSession session;
147151
private final DriverYieldSignal yieldSignal;
148152
private final LocalMemoryContext memoryContext;
@@ -259,9 +263,9 @@ private void updateBatchSize(int positionCount, long pageSize)
259263

260264
private void updateRetainedSize()
261265
{
262-
// TODO: This is an estimate without knowing anything about the SourcePage implementation details. SourcePage
263-
// should expose this information directly
264-
retainedSizeInBytes = Page.getInstanceSizeInBytes(page.getChannelCount());
266+
retainedSizeInBytes = INSTANCE_SIZE +
267+
selectedPositions.getRetainedSizeInBytes() +
268+
sizeOf(previouslyComputedResults);
265269
// increment the size only when it is the first reference
266270
ReferenceCountMap referenceCountMap = new ReferenceCountMap();
267271
page.retainedBytesForEachPart((object, size) -> {

Diff for: core/trino-main/src/main/java/io/trino/operator/project/SelectedPositions.java

+8
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import static com.google.common.base.Preconditions.checkPositionIndexes;
1818
import static com.google.common.base.Preconditions.checkState;
1919
import static com.google.common.base.Verify.verify;
20+
import static io.airlift.slice.SizeOf.instanceSize;
21+
import static io.airlift.slice.SizeOf.sizeOf;
2022
import static java.lang.System.arraycopy;
2123
import static java.util.Objects.checkFromIndexSize;
2224
import static java.util.Objects.requireNonNull;
2325

2426
public class SelectedPositions
2527
{
28+
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);
2629
private static final SelectedPositions EMPTY = positionsRange(0, 0);
2730

2831
private final boolean isList;
@@ -54,6 +57,11 @@ private SelectedPositions(boolean isList, int[] positions, int offset, int size)
5457
}
5558
}
5659

60+
public long getRetainedSizeInBytes()
61+
{
62+
return INSTANCE_SIZE + sizeOf(positions);
63+
}
64+
5765
public boolean isList()
5866
{
5967
return isList;

Diff for: core/trino-main/src/test/java/io/trino/operator/TestingSourcePage.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121
import java.util.function.ObjLongConsumer;
2222

2323
import static com.google.common.base.Preconditions.checkArgument;
24+
import static io.airlift.slice.SizeOf.instanceSize;
25+
import static io.airlift.slice.SizeOf.sizeOf;
2426
import static java.util.Objects.requireNonNull;
2527

2628
public class TestingSourcePage
2729
implements SourcePage
2830
{
31+
private static final long INSTANCE_SIZE = instanceSize(TestingSourcePage.class);
32+
2933
private final int positionCount;
3034
private final Block[] blocks;
3135
private final boolean[] loaded;
@@ -59,7 +63,9 @@ public long getSizeInBytes()
5963
@Override
6064
public long getRetainedSizeInBytes()
6165
{
62-
long retainedSizeInBytes = 0;
66+
long retainedSizeInBytes = INSTANCE_SIZE +
67+
sizeOf(blocks) +
68+
sizeOf(loaded);
6369
for (Block block : blocks) {
6470
if (block != null) {
6571
retainedSizeInBytes += block.getRetainedSizeInBytes();
@@ -71,6 +77,9 @@ public long getRetainedSizeInBytes()
7177
@Override
7278
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
7379
{
80+
consumer.accept(this, INSTANCE_SIZE);
81+
consumer.accept(blocks, sizeOf(blocks));
82+
consumer.accept(loaded, sizeOf(loaded));
7483
for (Block block : blocks) {
7584
if (block != null) {
7685
block.retainedBytesForEachPart(consumer);

Diff for: core/trino-spi/src/main/java/io/trino/spi/connector/FixedSourcePage.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818

1919
import java.util.function.ObjLongConsumer;
2020

21+
import static io.airlift.slice.SizeOf.instanceSize;
2122
import static java.util.Objects.requireNonNull;
2223

2324
final class FixedSourcePage
2425
implements SourcePage
2526
{
27+
private static final long INSTANCE_SIZE = instanceSize(FixedSourcePage.class);
28+
2629
private Page page;
2730

2831
FixedSourcePage(Page page)
@@ -46,12 +49,14 @@ public long getSizeInBytes()
4649
@Override
4750
public long getRetainedSizeInBytes()
4851
{
49-
return page.getRetainedSizeInBytes();
52+
return INSTANCE_SIZE + page.getRetainedSizeInBytes();
5053
}
5154

5255
@Override
5356
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
5457
{
58+
consumer.accept(this, INSTANCE_SIZE);
59+
consumer.accept(page, Page.getInstanceSizeInBytes(page.getChannelCount()));
5560
for (int i = 0; i < page.getChannelCount(); i++) {
5661
page.getBlock(i).retainedBytesForEachPart(consumer);
5762
}

Diff for: core/trino-spi/src/main/java/io/trino/spi/connector/PositionCountSourcePage.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
import java.util.Objects;
2020
import java.util.function.ObjLongConsumer;
2121

22+
import static io.airlift.slice.SizeOf.instanceSize;
23+
2224
final class PositionCountSourcePage
2325
implements SourcePage
2426
{
27+
private static final long INSTANCE_SIZE = instanceSize(PositionCountSourcePage.class);
28+
2529
private int positionCount;
2630

2731
PositionCountSourcePage(int positionCount)
@@ -47,11 +51,14 @@ public long getSizeInBytes()
4751
@Override
4852
public long getRetainedSizeInBytes()
4953
{
50-
return 0;
54+
return INSTANCE_SIZE;
5155
}
5256

5357
@Override
54-
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer) {}
58+
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
59+
{
60+
consumer.accept(this, INSTANCE_SIZE);
61+
}
5562

5663
@Override
5764
public int getChannelCount()

Diff for: lib/trino-orc/src/main/java/io/trino/orc/OrcRecordReader.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import static com.google.common.base.Preconditions.checkArgument;
6767
import static com.google.common.base.Preconditions.checkState;
6868
import static io.airlift.slice.SizeOf.instanceSize;
69+
import static io.airlift.slice.SizeOf.sizeOf;
6970
import static io.trino.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
7071
import static io.trino.orc.OrcReader.BATCH_SIZE_GROWTH_FACTOR;
7172
import static io.trino.orc.OrcReader.MAX_BATCH_SIZE;
@@ -467,6 +468,8 @@ public SourcePage nextPage()
467468
private class OrcSourcePage
468469
implements SourcePage
469470
{
471+
private static final long INSTANCE_SIZE = instanceSize(OrcSourcePage.class);
472+
470473
private final int expectedPageId = currentPageId;
471474
private final Block[] blocks = new Block[columnReaders.length + (appendRowNumberColumn ? 1 : 0)];
472475
private final int rowNumberColumnIndex = appendRowNumberColumn ? columnReaders.length : -1;
@@ -478,6 +481,7 @@ private class OrcSourcePage
478481
public OrcSourcePage(int positionCount)
479482
{
480483
selectedPositions = new SelectedPositions(positionCount, null);
484+
retainedSizeInBytes = shallowRetainedSizeInBytes();
481485
}
482486

483487
@Override
@@ -498,9 +502,19 @@ public long getRetainedSizeInBytes()
498502
return retainedSizeInBytes;
499503
}
500504

505+
private long shallowRetainedSizeInBytes()
506+
{
507+
return INSTANCE_SIZE +
508+
sizeOf(blocks) +
509+
selectedPositions.retainedSizeInBytes();
510+
}
511+
501512
@Override
502513
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
503514
{
515+
consumer.accept(this, INSTANCE_SIZE);
516+
consumer.accept(blocks, sizeOf(blocks));
517+
consumer.accept(selectedPositions, selectedPositions.retainedSizeInBytes());
504518
for (Block block : blocks) {
505519
if (block != null) {
506520
block.retainedBytesForEachPart(consumer);
@@ -557,7 +571,7 @@ public Page getPage()
557571
public void selectPositions(int[] positions, int offset, int size)
558572
{
559573
selectedPositions = selectedPositions.selectPositions(positions, offset, size);
560-
retainedSizeInBytes = 0;
574+
retainedSizeInBytes = shallowRetainedSizeInBytes();
561575
for (int i = 0; i < blocks.length; i++) {
562576
Block block = blocks[i];
563577
if (block != null) {
@@ -571,6 +585,13 @@ public void selectPositions(int[] positions, int offset, int size)
571585

572586
private record SelectedPositions(int positionCount, @Nullable int[] positions)
573587
{
588+
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);
589+
590+
public long retainedSizeInBytes()
591+
{
592+
return INSTANCE_SIZE + sizeOf(positions);
593+
}
594+
574595
@CheckReturnValue
575596
public Block apply(Block block)
576597
{

Diff for: lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@
8080
import static com.google.common.base.Preconditions.checkState;
8181
import static com.google.common.collect.ImmutableList.toImmutableList;
8282
import static com.google.common.collect.ImmutableSet.toImmutableSet;
83+
import static io.airlift.slice.SizeOf.instanceSize;
84+
import static io.airlift.slice.SizeOf.sizeOf;
8385
import static io.airlift.slice.Slices.utf8Slice;
8486
import static io.trino.parquet.ParquetValidationUtils.validateParquet;
8587
import static io.trino.parquet.ParquetWriteValidation.StatisticsValidation;
@@ -276,6 +278,8 @@ public SourcePage nextPage()
276278
private class ParquetSourcePage
277279
implements SourcePage
278280
{
281+
private static final long INSTANCE_SIZE = instanceSize(ParquetSourcePage.class);
282+
279283
private final int expectedPageId = currentPageId;
280284
private final Block[] blocks = new Block[columnFields.size() + (appendRowNumberColumn ? 1 : 0)];
281285
private final int rowNumberColumnIndex = appendRowNumberColumn ? columnFields.size() : -1;
@@ -287,6 +291,7 @@ private class ParquetSourcePage
287291
public ParquetSourcePage(int positionCount)
288292
{
289293
selectedPositions = new SelectedPositions(positionCount, null);
294+
retainedSizeInBytes = shallowRetainedSizeInBytes();
290295
}
291296

292297
@Override
@@ -307,9 +312,19 @@ public long getRetainedSizeInBytes()
307312
return retainedSizeInBytes;
308313
}
309314

315+
private long shallowRetainedSizeInBytes()
316+
{
317+
return INSTANCE_SIZE +
318+
sizeOf(blocks) +
319+
selectedPositions.retainedSizeInBytes();
320+
}
321+
310322
@Override
311323
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
312324
{
325+
consumer.accept(this, INSTANCE_SIZE);
326+
consumer.accept(blocks, sizeOf(blocks));
327+
consumer.accept(selectedPositions, selectedPositions.retainedSizeInBytes());
313328
for (Block block : blocks) {
314329
if (block != null) {
315330
block.retainedBytesForEachPart(consumer);
@@ -363,7 +378,7 @@ public Page getPage()
363378
public void selectPositions(int[] positions, int offset, int size)
364379
{
365380
selectedPositions = selectedPositions.selectPositions(positions, offset, size);
366-
retainedSizeInBytes = 0;
381+
retainedSizeInBytes = shallowRetainedSizeInBytes();
367382
for (int i = 0; i < blocks.length; i++) {
368383
Block block = blocks[i];
369384
if (block != null) {
@@ -377,6 +392,13 @@ public void selectPositions(int[] positions, int offset, int size)
377392

378393
private record SelectedPositions(int positionCount, @Nullable int[] positions)
379394
{
395+
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);
396+
397+
public long retainedSizeInBytes()
398+
{
399+
return INSTANCE_SIZE + sizeOf(positions);
400+
}
401+
380402
@CheckReturnValue
381403
public Block apply(Block block)
382404
{

Diff for: lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/MappedPageSource.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.function.ObjLongConsumer;
2525

26+
import static io.airlift.slice.SizeOf.instanceSize;
2627
import static java.util.Objects.requireNonNull;
2728

2829
public class MappedPageSource
@@ -81,6 +82,8 @@ public void close()
8182
private record MappedSourcePage(SourcePage sourcePage, int[] channels)
8283
implements SourcePage
8384
{
85+
private static final long INSTANCE_SIZE = instanceSize(MappedSourcePage.class);
86+
8487
private MappedSourcePage
8588
{
8689
requireNonNull(sourcePage, "sourcePage is null");
@@ -102,12 +105,17 @@ public long getSizeInBytes()
102105
@Override
103106
public long getRetainedSizeInBytes()
104107
{
105-
return sourcePage.getRetainedSizeInBytes();
108+
// channels array is not considered retained since it is shared by all instances created from
109+
// the same outer MappedPageSource instance
110+
return INSTANCE_SIZE + sourcePage.getRetainedSizeInBytes();
106111
}
107112

108113
@Override
109114
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
110115
{
116+
// channels array is not considered retained since it is shared by all instances created from
117+
// the same outer MappedPageSource instance
118+
consumer.accept(this, INSTANCE_SIZE);
111119
sourcePage.retainedBytesForEachPart(consumer);
112120
}
113121

0 commit comments

Comments
 (0)