Skip to content

Pipe: Optimize Batch and WAL memory allocation algorithms #15534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand All @@ -38,14 +40,19 @@
public abstract class PipeTabletEventBatch implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK =
PipeDataNodeResourceManager.memory()
.forceAllocateForModelFixedMemoryBlock(
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
PipeMemoryBlockType.BATCH);

protected final List<EnrichedEvent> events = new ArrayList<>();

private final int maxDelayInMs;
private long firstEventProcessingTime = Long.MIN_VALUE;

protected long totalBufferSize = 0;
private final PipeMemoryBlock allocatedMemoryBlock;
private final PipeDynamicMemoryBlock allocatedMemoryBlock;

protected volatile boolean isClosed = false;

Expand All @@ -54,19 +61,8 @@ protected PipeTabletEventBatch(final int maxDelayInMs, final long requestMaxBatc

// limit in buffer size
this.allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
.tryAllocate(requestMaxBatchSizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
.setShrinkCallback(
(oldMemory, newMemory) ->
LOGGER.info(
"The batch size limit has shrunk from {} to {}.", oldMemory, newMemory))
.setExpandMethod(
oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes))
.setExpandCallback(
(oldMemory, newMemory) ->
LOGGER.info(
"The batch size limit has expanded from {} to {}.", oldMemory, newMemory));
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
allocatedMemoryBlock.setExpandable(false);

if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
LOGGER.info(
Expand Down Expand Up @@ -131,8 +127,12 @@ protected abstract boolean constructBatch(final TabletInsertionEvent event)
throws WALPipeException, IOException;

public boolean shouldEmit() {
return totalBufferSize >= getMaxBatchSizeInBytes()
|| System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs;
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs);
return true;
}
return false;
}

private long getMaxBatchSizeInBytes() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.resource.memory;

import org.apache.tsfile.utils.Pair;

import javax.validation.constraints.NotNull;

import java.util.function.Consumer;
import java.util.stream.Stream;

public class PipeDynamicMemoryBlock {

private final PipeModelFixedMemoryBlock fixedMemoryBlock;

private boolean isExpandable = true;

private Consumer<PipeDynamicMemoryBlock> expand = null;

private volatile boolean released = false;

private volatile long memoryUsageInBytes;

private volatile double historyMemoryEfficiency;

private volatile double currentMemoryEfficiency;

PipeDynamicMemoryBlock(
final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long memoryUsageInBytes) {
this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
this.fixedMemoryBlock = fixedMemoryBlock;
}

public long getMemoryUsageInBytes() {
return memoryUsageInBytes;
}

public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
this.memoryUsageInBytes = memoryUsageInBytes;
}

public Pair<Double, Double> getMemoryEfficiency() {
synchronized (fixedMemoryBlock) {
return new Pair<>(historyMemoryEfficiency, currentMemoryEfficiency);
}
}

public void setExpandable(boolean expandable) {
isExpandable = expandable;
}

public void setExpand(Consumer<PipeDynamicMemoryBlock> expand) {
this.expand = expand;
}

public double getMemoryBlockUsageRatio() {
return (double) memoryUsageInBytes / fixedMemoryBlock.getMemoryUsageInBytes();
}

public double getFixedMemoryBlockUsageRatio() {
return (double) fixedMemoryBlock.getMemoryAllocatedInBytes()
/ fixedMemoryBlock.getMemoryUsageInBytes();
}

public long canAllocateMemorySize() {
return fixedMemoryBlock.getMemoryUsageInBytes() - fixedMemoryBlock.getMemoryAllocatedInBytes();
}

public synchronized long getExpectedAverageAllocatedMemorySize() {
return fixedMemoryBlock.getMemoryUsageInBytes() / fixedMemoryBlock.getMemoryBlocks().size();
}

public void updateCurrentMemoryEfficiencyAdjustMem(double currentMemoryEfficiency) {
synchronized (fixedMemoryBlock) {
this.historyMemoryEfficiency = this.currentMemoryEfficiency;
if (Double.isNaN(currentMemoryEfficiency)
|| Double.isInfinite(currentMemoryEfficiency)
|| currentMemoryEfficiency < 0.0) {
currentMemoryEfficiency = 0.0;
}
this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
fixedMemoryBlock.dynamicallyAdjustMemory(this);
}
}

public long getFixedMemoryCapacity() {
return fixedMemoryBlock.getMemoryUsageInBytes();
}

public void updateMemoryEfficiency(
double currentMemoryEfficiency, double historyMemoryEfficiency) {
synchronized (fixedMemoryBlock) {
if (Double.isNaN(currentMemoryEfficiency)
|| Double.isInfinite(currentMemoryEfficiency)
|| currentMemoryEfficiency < 0.0) {
currentMemoryEfficiency = 0.0;
}

if (Double.isNaN(historyMemoryEfficiency)
|| Double.isInfinite(historyMemoryEfficiency)
|| historyMemoryEfficiency < 0.0) {
currentMemoryEfficiency = 0.0;
}

this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
}
}

public Stream<PipeDynamicMemoryBlock> getMemoryBlocks() {
return fixedMemoryBlock.getMemoryBlocksStream();
}

public void applyForDynamicMemory(final long memoryUsageInBytes) {
fixedMemoryBlock.resetMemoryBlockSize(this, memoryUsageInBytes);
}

public boolean isReleased() {
return released;
}

public void close() {
if (released) {
return;
}
synchronized (fixedMemoryBlock) {
if (!released) {
fixedMemoryBlock.releaseMemory(this);
released = true;
}
}
}

void doExpand() {
if (isExpandable && expand != null) {
expand.accept(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public enum PipeMemoryBlockType {
NORMAL,
TABLET,
TS_FILE,
BATCH,
WAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.resource.memory.strategy.ThresholdAllocationStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,6 +101,18 @@ private double allowedMaxMemorySizeInBytesOfTsTiles() {
* getTotalNonFloatingMemorySizeInBytes();
}

public long getAllocatedMemorySizeInBytesOfWAL() {
return (long)
(PIPE_CONFIG.getPipeDataStructureWalMemoryProportion()
* getTotalNonFloatingMemorySizeInBytes());
}

public long getAllocatedMemorySizeInBytesOfBatch() {
return (long)
(PIPE_CONFIG.getPipeDataStructureBatchMemoryProportion()
* getTotalNonFloatingMemorySizeInBytes());
}

public boolean isEnough4TabletParsing() {
return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles
< EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
Expand Down Expand Up @@ -225,6 +238,39 @@ public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBy
}
}

public PipeModelFixedMemoryBlock forceAllocateForModelFixedMemoryBlock(
long fixedSizeInBytes, PipeMemoryBlockType type)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
return new PipeModelFixedMemoryBlock(Long.MAX_VALUE, new ThresholdAllocationStrategy());
}

if (fixedSizeInBytes == 0) {
return (PipeModelFixedMemoryBlock) registerMemoryBlock(0, type);
}

for (int i = 1, size = PIPE_CONFIG.getPipeMemoryAllocateMaxRetries(); i <= size; i++) {
if (getFreeMemorySizeInBytes() >= fixedSizeInBytes) {
break;
}

try {
Thread.sleep(PIPE_CONFIG.getPipeMemoryAllocateRetryIntervalInMs());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for available memory", ex);
}
}

if (getFreeMemorySizeInBytes() < fixedSizeInBytes) {
return (PipeModelFixedMemoryBlock) forceAllocateWithRetry(getFreeMemorySizeInBytes(), type);
}

synchronized (this) {
return (PipeModelFixedMemoryBlock) forceAllocateWithRetry(fixedSizeInBytes, type);
}
}

private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, PipeMemoryBlockType type)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
Expand All @@ -233,6 +279,9 @@ private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, PipeMemoryBlock
return new PipeTabletMemoryBlock(sizeInBytes);
case TS_FILE:
return new PipeTsFileMemoryBlock(sizeInBytes);
case BATCH:
case WAL:
return new PipeModelFixedMemoryBlock(sizeInBytes, new ThresholdAllocationStrategy());
default:
return new PipeMemoryBlock(sizeInBytes);
}
Expand Down Expand Up @@ -466,6 +515,11 @@ private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockTyp
case TS_FILE:
returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes);
break;
case BATCH:
case WAL:
returnedMemoryBlock =
new PipeModelFixedMemoryBlock(sizeInBytes, new ThresholdAllocationStrategy());
break;
default:
returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes);
break;
Expand Down
Loading
Loading