Skip to content

Improve PageProcessor retained bytes calculations #25602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.function.ObjLongConsumer;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.util.Objects.requireNonNull;

public class InputChannels
Expand Down Expand Up @@ -81,7 +83,11 @@ public String toString()
private static final class InputChannelsSourcePage
implements SourcePage
{
private static final long INSTANCE_SIZE = instanceSize(InputChannelsSourcePage.class);

private final SourcePage sourcePage;
// channels is not considered retained by this class since it is shared between instances created by
// the same outer InputChannels instance
private final int[] channels;
private final Block[] blocks;

Expand Down Expand Up @@ -118,17 +124,22 @@ public long getSizeInBytes()
@Override
public long getRetainedSizeInBytes()
{
return sourcePage.getRetainedSizeInBytes();
return INSTANCE_SIZE +
sizeOf(blocks) +
sourcePage.getRetainedSizeInBytes();
}

@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
consumer.accept(this, INSTANCE_SIZE);
consumer.accept(blocks, sizeOf(blocks));
for (Block block : blocks) {
if (block != null) {
block.retainedBytesForEachPart(consumer);
}
}
sourcePage.retainedBytesForEachPart(consumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.WorkProcessor.ProcessState.finished;
import static io.trino.operator.WorkProcessor.ProcessState.ofResult;
import static io.trino.operator.WorkProcessor.ProcessState.yielded;
Expand Down Expand Up @@ -143,6 +146,8 @@ public WorkProcessor<Page> createWorkProcessor(
private class ProjectSelectedPositions
implements WorkProcessor.Process<Page>
{
private static final long INSTANCE_SIZE = instanceSize(ProjectSelectedPositions.class);

private final ConnectorSession session;
private final DriverYieldSignal yieldSignal;
private final LocalMemoryContext memoryContext;
Expand Down Expand Up @@ -259,29 +264,45 @@ private void updateBatchSize(int positionCount, long pageSize)

private void updateRetainedSize()
{
// TODO: This is an estimate without knowing anything about the SourcePage implementation details. SourcePage
// should expose this information directly
retainedSizeInBytes = Page.getInstanceSizeInBytes(page.getChannelCount());
// increment the size only when it is the first reference
ReferenceCountMap referenceCountMap = new ReferenceCountMap();
page.retainedBytesForEachPart((object, size) -> {
if (referenceCountMap.incrementAndGet(object) == 1) {
retainedSizeInBytes += size;
}
});
RetainedBytesByPartVisitor visitor = new RetainedBytesByPartVisitor();

page.retainedBytesForEachPart(visitor);

for (Block previouslyComputedResult : previouslyComputedResults) {
if (previouslyComputedResult != null) {
previouslyComputedResult.retainedBytesForEachPart((object, size) -> {
if (referenceCountMap.incrementAndGet(object) == 1) {
retainedSizeInBytes += size;
}
});
previouslyComputedResult.retainedBytesForEachPart(visitor);
}
}

retainedSizeInBytes = INSTANCE_SIZE +
selectedPositions.getRetainedSizeInBytes() +
sizeOf(previouslyComputedResults) +
visitor.getRetainedSizeInBytes();

memoryContext.setBytes(retainedSizeInBytes);
}

private static final class RetainedBytesByPartVisitor
implements ObjLongConsumer<Object>
{
private final ReferenceCountMap referenceCountMap = new ReferenceCountMap();
private long retainedSizeInBytes;

public long getRetainedSizeInBytes()
{
return retainedSizeInBytes;
}

@Override
public void accept(Object object, long size)
{
// increment the size only when it is the first reference
if (referenceCountMap.incrementAndGetWithExtraIdentity(object, size) == 1) {
retainedSizeInBytes += size;
}
}
}

private ProcessBatchResult processBatch(int batchSize)
{
Block[] blocks = new Block[projections.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.System.arraycopy;
import static java.util.Objects.checkFromIndexSize;
import static java.util.Objects.requireNonNull;

public class SelectedPositions
{
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);
private static final SelectedPositions EMPTY = positionsRange(0, 0);

private final boolean isList;
Expand Down Expand Up @@ -54,6 +57,11 @@ private SelectedPositions(boolean isList, int[] positions, int offset, int size)
}
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + sizeOf(positions);
}

public boolean isList()
{
return isList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
import java.util.function.ObjLongConsumer;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.util.Objects.requireNonNull;

public class TestingSourcePage
implements SourcePage
{
private static final long INSTANCE_SIZE = instanceSize(TestingSourcePage.class);

private final int positionCount;
private final Block[] blocks;
private final boolean[] loaded;
Expand Down Expand Up @@ -59,7 +63,9 @@ public long getSizeInBytes()
@Override
public long getRetainedSizeInBytes()
{
long retainedSizeInBytes = 0;
long retainedSizeInBytes = INSTANCE_SIZE +
sizeOf(blocks) +
sizeOf(loaded);
for (Block block : blocks) {
if (block != null) {
retainedSizeInBytes += block.getRetainedSizeInBytes();
Expand All @@ -71,6 +77,9 @@ public long getRetainedSizeInBytes()
@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
consumer.accept(this, INSTANCE_SIZE);
consumer.accept(blocks, sizeOf(blocks));
consumer.accept(loaded, sizeOf(loaded));
for (Block block : blocks) {
if (block != null) {
block.retainedBytesForEachPart(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

import java.util.function.ObjLongConsumer;

import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

final class FixedSourcePage
implements SourcePage
{
private static final long INSTANCE_SIZE = instanceSize(FixedSourcePage.class);

private Page page;

FixedSourcePage(Page page)
Expand All @@ -46,12 +49,14 @@ public long getSizeInBytes()
@Override
public long getRetainedSizeInBytes()
{
return page.getRetainedSizeInBytes();
return INSTANCE_SIZE + page.getRetainedSizeInBytes();
}

@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
consumer.accept(this, INSTANCE_SIZE);
consumer.accept(page, Page.getInstanceSizeInBytes(page.getChannelCount()));
for (int i = 0; i < page.getChannelCount(); i++) {
page.getBlock(i).retainedBytesForEachPart(consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import java.util.Objects;
import java.util.function.ObjLongConsumer;

import static io.airlift.slice.SizeOf.instanceSize;

final class PositionCountSourcePage
implements SourcePage
{
private static final long INSTANCE_SIZE = instanceSize(PositionCountSourcePage.class);

private int positionCount;

PositionCountSourcePage(int positionCount)
Expand All @@ -47,11 +51,14 @@ public long getSizeInBytes()
@Override
public long getRetainedSizeInBytes()
{
return 0;
return INSTANCE_SIZE;
}

@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer) {}
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
consumer.accept(this, INSTANCE_SIZE);
}

@Override
public int getChannelCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public int incrementAndGet(Object key)
return addTo(getHashCode(key), 1) + 1;
}

/**
* Increments the reference count of an object by 1, using the extraIdentity parameter to produce a more
* varied hashCode. This calling convention should not be mixed with calling {@link ReferenceCountMap#incrementAndGet(Object)}
* since doing so may produce different hash codes
*/
public int incrementAndGetWithExtraIdentity(Object key, long extraIdentity)
{
return addTo(getHashCode(key, (int) extraIdentity), 1) + 1;
}

/**
* Decrements the reference count of an object by 1 and returns the updated reference count
*/
Expand All @@ -68,9 +78,9 @@ public long sizeOf()
}

/**
* Get the 64-bit hash code for an object
* Get the additional argument to use in order to produce the 64-bit hash code for an object
*/
private static long getHashCode(Object key)
private static int getExtraIdentity(Object key)
{
// identityHashCode of two objects are not guaranteed to be different.
// Any additional identity information can reduce collisions.
Expand All @@ -97,6 +107,19 @@ else if (key instanceof MapHashTables mapHashTables) {
else {
throw new IllegalArgumentException(format("Unsupported type for %s", key));
}
return extraIdentity;
}

/**
* Get the 64 bit hash code for the value, using the built-in extra identity argument resolution
*/
private static long getHashCode(Object key)
{
return getHashCode(key, getExtraIdentity(key));
}

private static long getHashCode(Object key, int extraIdentity)
{
return (((long) System.identityHashCode(key)) << Integer.SIZE) + extraIdentity;
}
}
23 changes: 22 additions & 1 deletion lib/trino-orc/src/main/java/io/trino/orc/OrcRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
import static io.trino.orc.OrcReader.BATCH_SIZE_GROWTH_FACTOR;
import static io.trino.orc.OrcReader.MAX_BATCH_SIZE;
Expand Down Expand Up @@ -467,6 +468,8 @@ public SourcePage nextPage()
private class OrcSourcePage
implements SourcePage
{
private static final long INSTANCE_SIZE = instanceSize(OrcSourcePage.class);

private final int expectedPageId = currentPageId;
private final Block[] blocks = new Block[columnReaders.length + (appendRowNumberColumn ? 1 : 0)];
private final int rowNumberColumnIndex = appendRowNumberColumn ? columnReaders.length : -1;
Expand All @@ -478,6 +481,7 @@ private class OrcSourcePage
public OrcSourcePage(int positionCount)
{
selectedPositions = new SelectedPositions(positionCount, null);
retainedSizeInBytes = shallowRetainedSizeInBytes();
}

@Override
Expand All @@ -498,9 +502,19 @@ public long getRetainedSizeInBytes()
return retainedSizeInBytes;
}

private long shallowRetainedSizeInBytes()
{
return INSTANCE_SIZE +
sizeOf(blocks) +
selectedPositions.retainedSizeInBytes();
}

@Override
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
{
consumer.accept(this, INSTANCE_SIZE);
consumer.accept(blocks, sizeOf(blocks));
consumer.accept(selectedPositions, selectedPositions.retainedSizeInBytes());
for (Block block : blocks) {
if (block != null) {
block.retainedBytesForEachPart(consumer);
Expand Down Expand Up @@ -557,7 +571,7 @@ public Page getPage()
public void selectPositions(int[] positions, int offset, int size)
{
selectedPositions = selectedPositions.selectPositions(positions, offset, size);
retainedSizeInBytes = 0;
retainedSizeInBytes = shallowRetainedSizeInBytes();
for (int i = 0; i < blocks.length; i++) {
Block block = blocks[i];
if (block != null) {
Expand All @@ -571,6 +585,13 @@ public void selectPositions(int[] positions, int offset, int size)

private record SelectedPositions(int positionCount, @Nullable int[] positions)
{
private static final long INSTANCE_SIZE = instanceSize(SelectedPositions.class);

public long retainedSizeInBytes()
{
return INSTANCE_SIZE + sizeOf(positions);
}

@CheckReturnValue
public Block apply(Block block)
{
Expand Down
Loading