|
38 | 38 | import java.util.Optional;
|
39 | 39 | import java.util.OptionalInt;
|
40 | 40 | import java.util.function.Function;
|
| 41 | +import java.util.function.ObjLongConsumer; |
41 | 42 |
|
42 | 43 | import static com.google.common.base.Preconditions.checkArgument;
|
43 | 44 | import static com.google.common.base.Verify.verify;
|
44 | 45 | import static com.google.common.base.Verify.verifyNotNull;
|
45 | 46 | import static com.google.common.collect.ImmutableList.toImmutableList;
|
| 47 | +import static io.airlift.slice.SizeOf.instanceSize; |
| 48 | +import static io.airlift.slice.SizeOf.sizeOf; |
46 | 49 | import static io.trino.operator.WorkProcessor.ProcessState.finished;
|
47 | 50 | import static io.trino.operator.WorkProcessor.ProcessState.ofResult;
|
48 | 51 | import static io.trino.operator.WorkProcessor.ProcessState.yielded;
|
@@ -143,6 +146,8 @@ public WorkProcessor<Page> createWorkProcessor(
|
143 | 146 | private class ProjectSelectedPositions
|
144 | 147 | implements WorkProcessor.Process<Page>
|
145 | 148 | {
|
| 149 | + private static final long INSTANCE_SIZE = instanceSize(ProjectSelectedPositions.class); |
| 150 | + |
146 | 151 | private final ConnectorSession session;
|
147 | 152 | private final DriverYieldSignal yieldSignal;
|
148 | 153 | private final LocalMemoryContext memoryContext;
|
@@ -259,29 +264,45 @@ private void updateBatchSize(int positionCount, long pageSize)
|
259 | 264 |
|
260 | 265 | private void updateRetainedSize()
|
261 | 266 | {
|
262 |
| - // TODO: This is an estimate without knowing anything about the SourcePage implementation details. SourcePage |
263 |
| - // should expose this information directly |
264 |
| - retainedSizeInBytes = Page.getInstanceSizeInBytes(page.getChannelCount()); |
265 |
| - // increment the size only when it is the first reference |
266 |
| - ReferenceCountMap referenceCountMap = new ReferenceCountMap(); |
267 |
| - page.retainedBytesForEachPart((object, size) -> { |
268 |
| - if (referenceCountMap.incrementAndGet(object) == 1) { |
269 |
| - retainedSizeInBytes += size; |
270 |
| - } |
271 |
| - }); |
| 267 | + RetainedBytesByPartVisitor visitor = new RetainedBytesByPartVisitor(); |
| 268 | + |
| 269 | + page.retainedBytesForEachPart(visitor); |
| 270 | + |
272 | 271 | for (Block previouslyComputedResult : previouslyComputedResults) {
|
273 | 272 | if (previouslyComputedResult != null) {
|
274 |
| - previouslyComputedResult.retainedBytesForEachPart((object, size) -> { |
275 |
| - if (referenceCountMap.incrementAndGet(object) == 1) { |
276 |
| - retainedSizeInBytes += size; |
277 |
| - } |
278 |
| - }); |
| 273 | + previouslyComputedResult.retainedBytesForEachPart(visitor); |
279 | 274 | }
|
280 | 275 | }
|
281 | 276 |
|
| 277 | + retainedSizeInBytes = INSTANCE_SIZE + |
| 278 | + selectedPositions.getRetainedSizeInBytes() + |
| 279 | + sizeOf(previouslyComputedResults) + |
| 280 | + visitor.getRetainedSizeInBytes(); |
| 281 | + |
282 | 282 | memoryContext.setBytes(retainedSizeInBytes);
|
283 | 283 | }
|
284 | 284 |
|
| 285 | + private static final class RetainedBytesByPartVisitor |
| 286 | + implements ObjLongConsumer<Object> |
| 287 | + { |
| 288 | + private final ReferenceCountMap referenceCountMap = new ReferenceCountMap(); |
| 289 | + private long retainedSizeInBytes; |
| 290 | + |
| 291 | + public long getRetainedSizeInBytes() |
| 292 | + { |
| 293 | + return retainedSizeInBytes; |
| 294 | + } |
| 295 | + |
| 296 | + @Override |
| 297 | + public void accept(Object object, long size) |
| 298 | + { |
| 299 | + // increment the size only when it is the first reference |
| 300 | + if (referenceCountMap.incrementAndGetWithExtraIdentity(object, (int) size) == 1) { |
| 301 | + retainedSizeInBytes += size; |
| 302 | + } |
| 303 | + } |
| 304 | + } |
| 305 | + |
285 | 306 | private ProcessBatchResult processBatch(int batchSize)
|
286 | 307 | {
|
287 | 308 | Block[] blocks = new Block[projections.size()];
|
|
0 commit comments