diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java index d207ec9018d3..9bff91c50ad8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java @@ -22,7 +22,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; -import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType; +import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -38,6 +40,11 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class); + private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK = + PipeDataNodeResourceManager.memory() + .forceAllocateForModelFixedMemoryBlock( + PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(), + PipeMemoryBlockType.BATCH); protected final List events = new ArrayList<>(); @@ -45,7 +52,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { private long firstEventProcessingTime = Long.MIN_VALUE; protected long totalBufferSize = 0; - private final PipeMemoryBlock allocatedMemoryBlock; + private final PipeDynamicMemoryBlock allocatedMemoryBlock; protected volatile boolean isClosed = false; @@ -54,19 +61,8 @@ protected PipeTabletEventBatch(final int maxDelayInMs, final long requestMaxBatc // limit in buffer size this.allocatedMemoryBlock = - PipeDataNodeResourceManager.memory() - .tryAllocate(requestMaxBatchSizeInBytes) - .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0)) - .setShrinkCallback( - (oldMemory, newMemory) -> - LOGGER.info( - "The batch size limit has shrunk from {} to {}.", oldMemory, newMemory)) - .setExpandMethod( - oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes)) - .setExpandCallback( - (oldMemory, newMemory) -> - LOGGER.info( - "The batch size limit has expanded from {} to {}.", oldMemory, newMemory)); + PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes); + allocatedMemoryBlock.setExpandable(false); if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) { LOGGER.info( @@ -131,8 +127,12 @@ protected abstract boolean constructBatch(final TabletInsertionEvent event) throws WALPipeException, IOException; public boolean shouldEmit() { - return totalBufferSize >= getMaxBatchSizeInBytes() - || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + final long diff = System.currentTimeMillis() - firstEventProcessingTime; + if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) { + allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs); + return true; + } + return false; } private long getMaxBatchSizeInBytes() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java new file mode 100644 index 000000000000..4e33b8718282 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.memory; + +import org.apache.tsfile.utils.Pair; + +import javax.validation.constraints.NotNull; + +import java.util.function.Consumer; +import java.util.stream.Stream; + +public class PipeDynamicMemoryBlock { + + private final PipeModelFixedMemoryBlock fixedMemoryBlock; + + private boolean isExpandable = true; + + private Consumer expand = null; + + private volatile boolean released = false; + + private volatile long memoryUsageInBytes; + + private volatile double historyMemoryEfficiency; + + private volatile double currentMemoryEfficiency; + + PipeDynamicMemoryBlock( + final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long memoryUsageInBytes) { + this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0); + this.fixedMemoryBlock = fixedMemoryBlock; + } + + public long getMemoryUsageInBytes() { + return memoryUsageInBytes; + } + + public void setMemoryUsageInBytes(final long memoryUsageInBytes) { + this.memoryUsageInBytes = memoryUsageInBytes; + } + + public Pair getMemoryEfficiency() { + synchronized (fixedMemoryBlock) { + return new Pair<>(historyMemoryEfficiency, currentMemoryEfficiency); + } + } + + public void setExpandable(boolean expandable) { + isExpandable = expandable; + } + + public void setExpand(Consumer expand) { + this.expand = expand; + } + + public double getMemoryBlockUsageRatio() { + return (double) memoryUsageInBytes / fixedMemoryBlock.getMemoryUsageInBytes(); + } + + public double getFixedMemoryBlockUsageRatio() { + return (double) fixedMemoryBlock.getMemoryAllocatedInBytes() + / fixedMemoryBlock.getMemoryUsageInBytes(); + } + + public long canAllocateMemorySize() { + return fixedMemoryBlock.getMemoryUsageInBytes() - fixedMemoryBlock.getMemoryAllocatedInBytes(); + } + + public synchronized long getExpectedAverageAllocatedMemorySize() { + return fixedMemoryBlock.getMemoryUsageInBytes() / fixedMemoryBlock.getMemoryBlocks().size(); + } + + public void updateCurrentMemoryEfficiencyAdjustMem(double currentMemoryEfficiency) { + synchronized (fixedMemoryBlock) { + this.historyMemoryEfficiency = this.currentMemoryEfficiency; + if (Double.isNaN(currentMemoryEfficiency) + || Double.isInfinite(currentMemoryEfficiency) + || currentMemoryEfficiency < 0.0) { + currentMemoryEfficiency = 0.0; + } + this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0); + fixedMemoryBlock.dynamicallyAdjustMemory(this); + } + } + + public long getFixedMemoryCapacity() { + return fixedMemoryBlock.getMemoryUsageInBytes(); + } + + public void updateMemoryEfficiency( + double currentMemoryEfficiency, double historyMemoryEfficiency) { + synchronized (fixedMemoryBlock) { + if (Double.isNaN(currentMemoryEfficiency) + || Double.isInfinite(currentMemoryEfficiency) + || currentMemoryEfficiency < 0.0) { + currentMemoryEfficiency = 0.0; + } + + if (Double.isNaN(historyMemoryEfficiency) + || Double.isInfinite(historyMemoryEfficiency) + || historyMemoryEfficiency < 0.0) { + currentMemoryEfficiency = 0.0; + } + + this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0); + this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0); + } + } + + public Stream getMemoryBlocks() { + return fixedMemoryBlock.getMemoryBlocksStream(); + } + + public void applyForDynamicMemory(final long memoryUsageInBytes) { + fixedMemoryBlock.resetMemoryBlockSize(this, memoryUsageInBytes); + } + + public boolean isReleased() { + return released; + } + + public void close() { + if (released) { + return; + } + synchronized (fixedMemoryBlock) { + if (!released) { + fixedMemoryBlock.releaseMemory(this); + released = true; + } + } + } + + void doExpand() { + if (isExpandable && expand != null) { + expand.accept(this); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java index 5b626df04c3f..846fc7dd1ce1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java @@ -23,4 +23,6 @@ public enum PipeMemoryBlockType { NORMAL, TABLET, TS_FILE, + BATCH, + WAL } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index a98338dd417d..90e168a8f9b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.resource.memory.strategy.ThresholdAllocationStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +101,18 @@ private double allowedMaxMemorySizeInBytesOfTsTiles() { * getTotalNonFloatingMemorySizeInBytes(); } + public long getAllocatedMemorySizeInBytesOfWAL() { + return (long) + (PIPE_CONFIG.getPipeDataStructureWalMemoryProportion() + * getTotalNonFloatingMemorySizeInBytes()); + } + + public long getAllocatedMemorySizeInBytesOfBatch() { + return (long) + (PIPE_CONFIG.getPipeDataStructureBatchMemoryProportion() + * getTotalNonFloatingMemorySizeInBytes()); + } + public boolean isEnough4TabletParsing() { return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles < EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTabletsAndTsFiles() @@ -225,6 +238,39 @@ public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBy } } + public PipeModelFixedMemoryBlock forceAllocateForModelFixedMemoryBlock( + long fixedSizeInBytes, PipeMemoryBlockType type) + throws PipeRuntimeOutOfMemoryCriticalException { + if (!PIPE_MEMORY_MANAGEMENT_ENABLED) { + return new PipeModelFixedMemoryBlock(Long.MAX_VALUE, new ThresholdAllocationStrategy()); + } + + if (fixedSizeInBytes == 0) { + return (PipeModelFixedMemoryBlock) registerMemoryBlock(0, type); + } + + for (int i = 1, size = PIPE_CONFIG.getPipeMemoryAllocateMaxRetries(); i <= size; i++) { + if (getFreeMemorySizeInBytes() >= fixedSizeInBytes) { + break; + } + + try { + Thread.sleep(PIPE_CONFIG.getPipeMemoryAllocateRetryIntervalInMs()); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for available memory", ex); + } + } + + if (getFreeMemorySizeInBytes() < fixedSizeInBytes) { + return (PipeModelFixedMemoryBlock) forceAllocateWithRetry(getFreeMemorySizeInBytes(), type); + } + + synchronized (this) { + return (PipeModelFixedMemoryBlock) forceAllocateWithRetry(fixedSizeInBytes, type); + } + } + private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, PipeMemoryBlockType type) throws PipeRuntimeOutOfMemoryCriticalException { if (!PIPE_MEMORY_MANAGEMENT_ENABLED) { @@ -233,6 +279,9 @@ private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, PipeMemoryBlock return new PipeTabletMemoryBlock(sizeInBytes); case TS_FILE: return new PipeTsFileMemoryBlock(sizeInBytes); + case BATCH: + case WAL: + return new PipeModelFixedMemoryBlock(sizeInBytes, new ThresholdAllocationStrategy()); default: return new PipeMemoryBlock(sizeInBytes); } @@ -466,6 +515,11 @@ private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockTyp case TS_FILE: returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes); break; + case BATCH: + case WAL: + returnedMemoryBlock = + new PipeModelFixedMemoryBlock(sizeInBytes, new ThresholdAllocationStrategy()); + break; default: returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java new file mode 100644 index 000000000000..647fb81a4b91 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.memory; + +import org.apache.iotdb.db.pipe.resource.memory.strategy.DynamicMemoryAllocationStrategy; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; + +public class PipeModelFixedMemoryBlock extends PipeFixedMemoryBlock { + + private final Set memoryBlocks = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + private final DynamicMemoryAllocationStrategy allocationStrategy; + + private volatile long memoryAllocatedInBytes; + + public PipeModelFixedMemoryBlock( + final long memoryUsageInBytes, final DynamicMemoryAllocationStrategy allocationStrategy) { + super(memoryUsageInBytes); + this.memoryAllocatedInBytes = 0; + this.allocationStrategy = allocationStrategy; + } + + public synchronized PipeDynamicMemoryBlock registerPipeBatchMemoryBlock( + final long memorySizeInBytes) { + final PipeDynamicMemoryBlock memoryBlock = new PipeDynamicMemoryBlock(this, 0); + memoryBlocks.add(memoryBlock); + if (memorySizeInBytes != 0) { + resetMemoryBlockSize(memoryBlock, memorySizeInBytes); + double e = (double) getMemoryUsageInBytes() / memorySizeInBytes; + memoryBlock.updateMemoryEfficiency(e, e); + return memoryBlock; + } + + memoryBlock.updateMemoryEfficiency(0.0, 0.0); + return memoryBlock; + } + + @Override + public synchronized boolean expand() { + // Ensure that the memory block that gets most of the memory is released first, which can reduce + // the jitter of memory allocationIf the memory block is not expanded, it will not be expanded + // again.This function not only completes the expansion but also the reduction. + memoryBlocks.stream() + .sorted((a, b) -> Long.compare(b.getMemoryUsageInBytes(), a.getMemoryUsageInBytes())) + .forEach(PipeDynamicMemoryBlock::doExpand); + return false; + } + + public long getMemoryAllocatedInBytes() { + return memoryAllocatedInBytes; + } + + public synchronized Set getMemoryBlocks() { + return memoryBlocks; + } + + synchronized void releaseMemory(final PipeDynamicMemoryBlock memoryBlock) { + resetMemoryBlockSize(memoryBlock, 0); + memoryBlocks.remove(memoryBlock); + } + + synchronized void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock block) { + if (this.isReleased() || block.isReleased() || !memoryBlocks.contains(block)) { + throw new IllegalStateException("The memory block has been released"); + } + allocationStrategy.dynamicallyAdjustMemory(block); + } + + synchronized void resetMemoryBlockSize( + final PipeDynamicMemoryBlock block, final long memorySizeInBytes) { + if (this.isReleased() || block.isReleased() || !memoryBlocks.contains(block)) { + throw new IllegalStateException("The memory block has been released"); + } + + final long diff = memorySizeInBytes - block.getMemoryUsageInBytes(); + + // If the capacity is expanded, determine whether it will exceed the maximum value of the fixed + // module + if (getMemoryUsageInBytes() - memoryAllocatedInBytes < diff) { + // Pay attention to the order of calls, otherwise it will cause resource leakage + block.setMemoryUsageInBytes( + block.getMemoryUsageInBytes() + getMemoryUsageInBytes() - memoryAllocatedInBytes); + memoryAllocatedInBytes = getMemoryUsageInBytes(); + return; + } + + memoryAllocatedInBytes = memoryAllocatedInBytes + diff; + block.setMemoryUsageInBytes(memorySizeInBytes); + } + + Stream getMemoryBlocksStream() { + if (isReleased()) { + throw new IllegalStateException("The memory block has been released"); + } + return memoryBlocks.stream(); + } + + @Override + public synchronized void close() { + memoryBlocks.forEach(PipeDynamicMemoryBlock::close); + super.close(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java new file mode 100644 index 000000000000..8e5ba9af0533 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.memory.strategy; + +import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock; + +// Now let's define the operation memory behavior: Producers produce memory, consumers consume +// memory, and in order to ensure that consumers do not encounter back pressure, the memory that +// consumers need to use is allocated in advance. Consumer instances obtain their expected memory +// through allocation strategies, and the total memory of all consumer instances must not be greater +// than the pre-allocated memory. The memory allocation algorithm is to adjust the memory of +// consumers so that the consumption rate can reach the optimal +public interface DynamicMemoryAllocationStrategy { + + void dynamicallyAdjustMemory(PipeDynamicMemoryBlock dynamicMemoryBlock); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java new file mode 100644 index 000000000000..72a390f799e1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.memory.strategy; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock; + +import org.apache.tsfile.utils.Pair; + +import java.util.concurrent.atomic.AtomicBoolean; + +// The algorithm is optimized for different scenarios: The following describes the behavior of the +// algorithm in different scenarios: +// 1. When the memory is large enough, it will try to allocate memory to the memory block +// 2. When the memory is insufficient, the algorithm will try to ensure that the memory with a +// relatively large memory share is released +public class ThresholdAllocationStrategy implements DynamicMemoryAllocationStrategy { + + private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); + + @Override + public void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock dynamicMemoryBlock) { + final double deficitRatio = calculateDeficitRatio(dynamicMemoryBlock); + final long oldMemoryUsageInBytes = dynamicMemoryBlock.getMemoryUsageInBytes(); + final long expectedMemory = (long) (oldMemoryUsageInBytes / deficitRatio); + final double memoryBlockUsageRatio = dynamicMemoryBlock.getMemoryBlockUsageRatio(); + final long maximumMemoryIncrease = + (long) + (dynamicMemoryBlock.getFixedMemoryCapacity() + * PIPE_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio()); + + // Avoid overflow and infinite values + if (deficitRatio <= 0.0 || oldMemoryUsageInBytes == 0 || expectedMemory == 0) { + dynamicMemoryBlock.applyForDynamicMemory(maximumMemoryIncrease); + final double efficiencyRatio = + (double) dynamicMemoryBlock.getMemoryUsageInBytes() / maximumMemoryIncrease; + dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, efficiencyRatio); + return; + } + + // No matter what, give priority to applying for memory use, and adjust the memory size when the + // memory is insufficient + final double lowUsageThreshold = + PIPE_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold(); + if (dynamicMemoryBlock.getFixedMemoryBlockUsageRatio() + < PIPE_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()) { + if (deficitRatio >= 1.0) { + return; + } + + final long maxAvailableMemory = + Math.min(expectedMemory, dynamicMemoryBlock.canAllocateMemorySize()); + long newMemoryRequest; + + // Need to ensure that you get memory in smaller chunks and get more memory faster + if (memoryBlockUsageRatio > lowUsageThreshold) { + newMemoryRequest = + Math.min(oldMemoryUsageInBytes + oldMemoryUsageInBytes / 2, maxAvailableMemory); + } else { + newMemoryRequest = Math.min(oldMemoryUsageInBytes * 2, maxAvailableMemory); + } + + dynamicMemoryBlock.applyForDynamicMemory(newMemoryRequest); + final double efficiencyRatio = + dynamicMemoryBlock.getMemoryUsageInBytes() / (double) expectedMemory; + dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, efficiencyRatio); + return; + } + + // Entering this logic means that the memory is insufficient and the memory allocation needs to + // be adjusted + final AtomicBoolean isMemoryNotEnough = new AtomicBoolean(false); + final double averageDeficitRatio = + dynamicMemoryBlock + .getMemoryBlocks() + .mapToDouble( + block -> { + double ratio = calculateDeficitRatio(block); + if (block.getMemoryUsageInBytes() == 0 || ratio == 0.0) { + isMemoryNotEnough.set(true); + } + return ratio; + }) + .average() + .orElse(1.0); + + final double adjustmentThreshold = PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold(); + // When memory is insufficient, try to ensure that smaller memory blocks apply for less memory, + // and larger memory blocks release more memory. + final double diff = + isMemoryNotEnough.get() && averageDeficitRatio > 2 * adjustmentThreshold + ? averageDeficitRatio - deficitRatio - adjustmentThreshold + : averageDeficitRatio - deficitRatio; + + if (Math.abs(diff) > PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold()) { + final long mem = (long) ((dynamicMemoryBlock.getMemoryUsageInBytes() / deficitRatio) * diff); + dynamicMemoryBlock.applyForDynamicMemory(dynamicMemoryBlock.getMemoryUsageInBytes() + mem); + if (oldMemoryUsageInBytes != dynamicMemoryBlock.getMemoryUsageInBytes()) { + final double efficiencyRatio = + dynamicMemoryBlock.getMemoryUsageInBytes() / (double) expectedMemory; + dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, efficiencyRatio); + } + } else if (memoryBlockUsageRatio > lowUsageThreshold + && memoryBlockUsageRatio > dynamicMemoryBlock.getExpectedAverageAllocatedMemorySize()) { + // If there is insufficient memory, some memory must be released + dynamicMemoryBlock.applyForDynamicMemory(oldMemoryUsageInBytes / 2); + dynamicMemoryBlock.updateMemoryEfficiency(deficitRatio / 2, deficitRatio / 2); + } + } + + private double calculateDeficitRatio(final PipeDynamicMemoryBlock block) { + final Pair memoryEfficiency = block.getMemoryEfficiency(); + double pipeDynamicMemoryHistoryWeight = PIPE_CONFIG.getPipeDynamicMemoryHistoryWeight(); + return (1 - pipeDynamicMemoryHistoryWeight) * memoryEfficiency.getRight() + + pipeDynamicMemoryHistoryWeight * memoryEfficiency.getLeft(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java index 0823c7e7b6e1..20469f2b7982 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java @@ -27,7 +27,9 @@ import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; -import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType; +import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; @@ -64,7 +66,14 @@ public class WALInsertNodeCache { IoTDBDescriptor.getInstance().getMemoryConfig(); private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); - private final PipeMemoryBlock allocatedMemoryBlock; + private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY = + PipeDataNodeResourceManager.memory() + .forceAllocateForModelFixedMemoryBlock( + PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(), + PipeMemoryBlockType.WAL); + + private final PipeDynamicMemoryBlock memoryBlock; + // Used to adjust the memory usage of the cache private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1); private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true); @@ -87,28 +96,12 @@ private WALInsertNodeCache(final Integer dataRegionId) { 0.5 * MEMORY_CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes() / CONFIG.getDataRegionNum()); - allocatedMemoryBlock = - PipeDataNodeResourceManager.memory() - .tryAllocate(requestedAllocateSize) - .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1)) - .setExpandMethod( - oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestedAllocateSize)) - .setExpandCallback( - (oldMemory, newMemory) -> { - memoryUsageCheatFactor.updateAndGet( - factor -> factor / ((double) newMemory / oldMemory)); - isBatchLoadEnabled.set(newMemory >= CONFIG.getWalFileSizeThresholdInByte()); - LOGGER.info( - "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded from {} to {}.", - dataRegionId, - oldMemory, - newMemory); - }); + memoryBlock = WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize); isBatchLoadEnabled.set( - allocatedMemoryBlock.getMemoryUsageInBytes() >= CONFIG.getWalFileSizeThresholdInByte()); + memoryBlock.getMemoryUsageInBytes() >= CONFIG.getWalFileSizeThresholdInByte()); lruCache = Caffeine.newBuilder() - .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes()) + .maximumWeight(requestedAllocateSize) .weigher( (Weigher>) (position, pair) -> { @@ -129,30 +122,51 @@ private WALInsertNodeCache(final Integer dataRegionId) { }) .recordStats() .build(new WALInsertNodeCacheLoader()); - allocatedMemoryBlock.setShrinkCallback( - (oldMemory, newMemory) -> { - memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory / newMemory)); - isBatchLoadEnabled.set(newMemory >= CONFIG.getWalFileSizeThresholdInByte()); - LOGGER.info( - "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk from {} to {}.", - dataRegionId, - oldMemory, - newMemory); - if (CONFIG.getWALCacheShrinkClearEnabled()) { - try { - lruCache.cleanUp(); - } catch (Exception e) { - LOGGER.warn( - "Failed to clear WALInsertNodeCache for dataRegion ID: {}.", dataRegionId, e); - return; - } - LOGGER.info( - "Successfully cleared WALInsertNodeCache for dataRegion ID: {}.", dataRegionId); + + memoryBlock.setExpandable(true); + memoryBlock.setExpand( + memoryBlock -> { + final long oldMemory = memoryBlock.getMemoryUsageInBytes(); + memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate()); + final long newMemory = memoryBlock.getMemoryUsageInBytes(); + if (newMemory > oldMemory) { + setExpandCallback(oldMemory, newMemory, dataRegionId); + } else if (newMemory < oldMemory) { + shrinkCallback(oldMemory, newMemory, dataRegionId); } }); PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId); } + private void setExpandCallback(long oldMemory, long newMemory, Integer dataRegionId) { + memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory / oldMemory)); + isBatchLoadEnabled.set(newMemory >= CONFIG.getWalFileSizeThresholdInByte()); + LOGGER.info( + "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded from {} to {}.", + dataRegionId, + oldMemory, + newMemory); + } + + private void shrinkCallback(long oldMemory, long newMemory, Integer dataRegionId) { + memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory / newMemory)); + isBatchLoadEnabled.set(newMemory >= CONFIG.getWalFileSizeThresholdInByte()); + LOGGER.info( + "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk from {} to {}.", + dataRegionId, + oldMemory, + newMemory); + if (CONFIG.getWALCacheShrinkClearEnabled()) { + try { + lruCache.cleanUp(); + } catch (Exception e) { + LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID: {}.", dataRegionId, e); + return; + } + LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID: {}.", dataRegionId); + } + } + /////////////////////////// Getter & Setter /////////////////////////// public InsertNode getInsertNode(final WALEntryPosition position) { @@ -378,7 +392,7 @@ boolean contains(WALEntryPosition position) { @TestOnly public void clear() { lruCache.invalidateAll(); - allocatedMemoryBlock.close(); + memoryBlock.close(); memTablesNeedSearch.clear(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index a813cf216e1e..675d6af7bdc3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -216,8 +216,10 @@ public class CommonConfig { private int pipeDataStructureTabletRowSize = 2048; private int pipeDataStructureTabletSizeInBytes = 2097152; - private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.4; - private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.4; + private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.2; + private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.2; + private double pipeDataStructureWalMemoryProportion = 0.2; + private double PipeDataStructureBatchMemoryProportion = 0.2; private double pipeTotalFloatingMemoryProportion = 0.2; private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000; @@ -299,6 +301,11 @@ public class CommonConfig { private PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime = PipeRemainingTimeRateAverageTime.MEAN; private double pipeTsFileScanParsingThreshold = 0.05; + private double pipeDynamicMemoryHistoryWeight = 0.5; + private double pipeDynamicMemoryAdjustmentThreshold = 0.05; + private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 0.1d; + private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d; + private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d; private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 minutes private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; // 3 minutes @@ -838,6 +845,34 @@ public void setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold( pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold); } + public double getPipeDataStructureWalMemoryProportion() { + return pipeDataStructureWalMemoryProportion; + } + + public void setPipeDataStructureWalMemoryProportion(double pipeDataStructureWalMemoryProportion) { + if (this.pipeDataStructureWalMemoryProportion == pipeDataStructureWalMemoryProportion) { + return; + } + this.pipeDataStructureWalMemoryProportion = pipeDataStructureWalMemoryProportion; + logger.info( + "pipeDataStructureWalMemoryProportion is set to {}.", pipeDataStructureWalMemoryProportion); + } + + public double getPipeDataStructureBatchMemoryProportion() { + return PipeDataStructureBatchMemoryProportion; + } + + public void setPipeDataStructureBatchMemoryProportion( + double PipeDataStructureBatchMemoryProportion) { + if (this.PipeDataStructureBatchMemoryProportion == PipeDataStructureBatchMemoryProportion) { + return; + } + this.PipeDataStructureBatchMemoryProportion = PipeDataStructureBatchMemoryProportion; + logger.info( + "PipeDataStructureBatchMemoryProportion is set to {}.", + PipeDataStructureBatchMemoryProportion); + } + public double getPipeTotalFloatingMemoryProportion() { return pipeTotalFloatingMemoryProportion; } @@ -1770,6 +1805,82 @@ public void setPipeTsFileScanParsingThreshold(double pipeTsFileScanParsingThresh logger.info("pipeTsFileScanParsingThreshold is set to {}", pipeTsFileScanParsingThreshold); } + public double getPipeDynamicMemoryHistoryWeight() { + return pipeDynamicMemoryHistoryWeight; + } + + public void setPipeDynamicMemoryHistoryWeight(double pipeDynamicMemoryHistoryWeight) { + if (this.pipeDynamicMemoryHistoryWeight == pipeDynamicMemoryHistoryWeight) { + return; + } + this.pipeDynamicMemoryHistoryWeight = pipeDynamicMemoryHistoryWeight; + logger.info("PipeDynamicMemoryHistoryWeight is set to {}", pipeDynamicMemoryHistoryWeight); + } + + public double getPipeDynamicMemoryAdjustmentThreshold() { + return pipeDynamicMemoryAdjustmentThreshold; + } + + public void setPipeDynamicMemoryAdjustmentThreshold(double pipeDynamicMemoryAdjustmentThreshold) { + if (this.pipeDynamicMemoryAdjustmentThreshold == pipeDynamicMemoryAdjustmentThreshold) { + return; + } + this.pipeDynamicMemoryAdjustmentThreshold = pipeDynamicMemoryAdjustmentThreshold; + logger.info( + "pipeDynamicMemoryAdjustmentThreshold is set to {}", pipeDynamicMemoryAdjustmentThreshold); + } + + public double getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() { + return pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio; + } + + public void setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio( + double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) { + if (this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio + == pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) { + return; + } + this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = + pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio; + logger.info( + "pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio is set to {}", + pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio); + } + + public double getPipeThresholdAllocationStrategyLowUsageThreshold() { + return pipeThresholdAllocationStrategyLowUsageThreshold; + } + + public void setPipeThresholdAllocationStrategyLowUsageThreshold( + double pipeThresholdAllocationStrategyLowUsageThreshold) { + if (this.pipeThresholdAllocationStrategyLowUsageThreshold + == pipeThresholdAllocationStrategyLowUsageThreshold) { + return; + } + this.pipeThresholdAllocationStrategyLowUsageThreshold = + pipeThresholdAllocationStrategyLowUsageThreshold; + logger.info( + "pipeMemoryBlockLowUsageThreshold is set to {}", + pipeThresholdAllocationStrategyLowUsageThreshold); + } + + public double getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() { + return pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold; + } + + public void setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold( + double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) { + if (this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold + == pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) { + return; + } + this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = + pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold; + logger.info( + "pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold is set to {}", + pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold); + } + public double getPipeAllSinksRateLimitBytesPerSecond() { return pipeAllSinksRateLimitBytesPerSecond; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 9b63b46626c0..f57da8a4c62f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -79,6 +79,14 @@ public double getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold() { return COMMON_CONFIG.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(); } + public double getPipeDataStructureWalMemoryProportion() { + return COMMON_CONFIG.getPipeDataStructureWalMemoryProportion(); + } + + public double getPipeDataStructureBatchMemoryProportion() { + return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion(); + } + public double getPipeTotalFloatingMemoryProportion() { return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion(); } @@ -223,6 +231,26 @@ public double getPipeTsFileScanParsingThreshold() { return COMMON_CONFIG.getPipeTsFileScanParsingThreshold(); } + public double getPipeDynamicMemoryHistoryWeight() { + return COMMON_CONFIG.getPipeDynamicMemoryHistoryWeight(); + } + + public double getPipeDynamicMemoryAdjustmentThreshold() { + return COMMON_CONFIG.getPipeDynamicMemoryAdjustmentThreshold(); + } + + public double getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() { + return COMMON_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(); + } + + public double getPipeThresholdAllocationStrategyLowUsageThreshold() { + return COMMON_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold(); + } + + public double getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() { + return COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(); + } + /////////////////////////////// Meta Consistency /////////////////////////////// public boolean isSeperatedPipeHeartbeatEnabled() { @@ -475,6 +503,19 @@ public void printAllConfigs() { "PipeRemainingTimeCommitRateAverageTime: {}", getPipeRemainingTimeCommitRateAverageTime()); LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold()); + LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight()); + LOGGER.info( + "PipeDynamicMemoryAdjustmentThreshold: {}", getPipeDynamicMemoryAdjustmentThreshold()); + LOGGER.info( + "PipeThresholdAllocationStrategyMaximumMemoryIncrementRatio: {}", + getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio()); + LOGGER.info( + "PipeThresholdAllocationStrategyLowUsageThreshold: {}", + getPipeThresholdAllocationStrategyLowUsageThreshold()); + LOGGER.info( + "PipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold: {}", + getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()); + LOGGER.info( "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}", getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 919df9261899..e0d36cd64c8f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -228,6 +228,16 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr "pipe_data_structure_ts_file_memory_block_allocation_reject_threshold", String.valueOf( config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold())))); + config.setPipeDataStructureWalMemoryProportion( + Double.parseDouble( + properties.getProperty( + "pipe_data_structure_wal_memory_proportion", + String.valueOf(config.getPipeDataStructureWalMemoryProportion())))); + config.setPipeDataStructureBatchMemoryProportion( + Double.parseDouble( + properties.getProperty( + "pipe_data_structure_batch_memory_proportion", + String.valueOf(config.getPipeDataStructureBatchMemoryProportion())))); config.setPipeTotalFloatingMemoryProportion( Double.parseDouble( properties.getProperty( @@ -504,6 +514,38 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr properties.getProperty( "pipe_tsfile_scan_parsing_threshold", String.valueOf(config.getPipeTsFileScanParsingThreshold())))); + + config.setPipeDynamicMemoryHistoryWeight( + Double.parseDouble( + properties.getProperty( + "pipe_dynamic_memory_history_weight", + String.valueOf(config.getPipeDynamicMemoryHistoryWeight())))); + + config.setPipeDynamicMemoryAdjustmentThreshold( + Double.parseDouble( + properties.getProperty( + "pipe_dynamic_memory_adjustment_threshold", + String.valueOf(config.getPipeDynamicMemoryAdjustmentThreshold())))); + + config.setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio( + Double.parseDouble( + properties.getProperty( + "pipe_threshold_allocation_strategy_maximum_memory_increment_ratio", + String.valueOf( + config.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio())))); + + config.setPipeThresholdAllocationStrategyLowUsageThreshold( + Double.parseDouble( + properties.getProperty( + "pipe_threshold_allocation_strategy_low_usage_threshold", + String.valueOf(config.getPipeThresholdAllocationStrategyLowUsageThreshold())))); + + config.setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold( + Double.parseDouble( + properties.getProperty( + "pipe_threshold_allocation_strategy_high_usage_threshold", + String.valueOf( + config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold())))); } public static void loadPipeExternalConfig(