Skip to content

Commit

Permalink
[To dev/1.3]Memtable enhance fix (#15040)
Browse files Browse the repository at this point in the history
* perf: improve seq inserting

* rename MergeSortTvListIterator to MergeSortTVListIterator

* fix: concurrent indices modification during query sort and flush sort

* revert: iterate rows in aligned memChunk

* fix: testMemAlignedChunkLoader test

* fix: no need to synchronize list since sort is already an synchronized method

* fix: optimize mininum time update in TVList

* perf: special case for merge sort iterator when no handover occurs
  • Loading branch information
shizy818 authored Mar 7, 2025
1 parent 3bb1e17 commit 5968100
Show file tree
Hide file tree
Showing 25 changed files with 435 additions and 798 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,6 @@ public boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String measurement)
return !memChunkGroup.contains(measurement);
}

@Override
public long getMeasurementSize(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
if (null == memChunkGroup) {
return 0;
}
return memChunkGroup.getMeasurementSize(measurement);
}

@Override
public IWritableMemChunk getWritableMemChunk(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.MergeSortAlignedTVListIterator;
import org.apache.iotdb.db.utils.datastructure.PageColumnAccessInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
Expand Down Expand Up @@ -702,85 +702,6 @@ private void handleEncoding(
}
}

private void writePageValuesIntoWriter(
IChunkWriter chunkWriter,
long[] times,
PageColumnAccessInfo[] pageColumnAccessInfo,
MergeSortAlignedTVListIterator timeValuePairIterator) {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;

// update value statistics
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
ValueChunkWriter valueChunkWriter =
alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
PageColumnAccessInfo pageAccessInfo = pageColumnAccessInfo[columnIndex];
switch (dataTypes.get(columnIndex)) {
case BOOLEAN:
for (int index = 0; index < pageAccessInfo.count(); index++) {
int[] accessInfo = pageAccessInfo.get(index);
TsPrimitiveType value =
timeValuePairIterator.getPrimitiveObject(accessInfo, columnIndex);
valueChunkWriter.write(
times[index], value != null && value.getBoolean(), value == null);
}
break;
case INT32:
case DATE:
for (int index = 0; index < pageAccessInfo.count(); index++) {
int[] accessInfo = pageAccessInfo.get(index);
TsPrimitiveType value =
timeValuePairIterator.getPrimitiveObject(accessInfo, columnIndex);
valueChunkWriter.write(times[index], value == null ? 0 : value.getInt(), value == null);
}
break;
case INT64:
case TIMESTAMP:
for (int index = 0; index < pageAccessInfo.count(); index++) {
int[] accessInfo = pageAccessInfo.get(index);
TsPrimitiveType value =
timeValuePairIterator.getPrimitiveObject(accessInfo, columnIndex);
valueChunkWriter.write(
times[index], value == null ? 0L : value.getLong(), value == null);
}
break;
case FLOAT:
for (int index = 0; index < pageAccessInfo.count(); index++) {
int[] accessInfo = pageAccessInfo.get(index);
TsPrimitiveType value =
timeValuePairIterator.getPrimitiveObject(accessInfo, columnIndex);
valueChunkWriter.write(
times[index], value == null ? 0f : value.getFloat(), value == null);
}
break;
case DOUBLE:
for (int index = 0; index < pageAccessInfo.count(); index++) {
int[] accessInfo = pageAccessInfo.get(index);
TsPrimitiveType value =
timeValuePairIterator.getPrimitiveObject(accessInfo, columnIndex);
valueChunkWriter.write(
times[index], value == null ? 0d : value.getDouble(), value == null);
}
break;
case TEXT:
case BLOB:
case STRING:
for (int index = 0; index < pageAccessInfo.count(); index++) {
int[] accessInfo = pageAccessInfo.get(index);
TsPrimitiveType value =
timeValuePairIterator.getPrimitiveObject(accessInfo, columnIndex);
valueChunkWriter.write(
times[index],
value == null ? Binary.EMPTY_VALUE : value.getBinary(),
value == null);
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataTypes.get(columnIndex)));
}
}
}

@Override
public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
if (TVLIST_SORT_THRESHOLD == 0) {
Expand All @@ -799,30 +720,49 @@ public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
int pointNumInChunk = 0;
long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];

PageColumnAccessInfo[] pageColumnAccessInfo = new PageColumnAccessInfo[dataTypes.size()];
for (int i = 0; i < pageColumnAccessInfo.length; i++) {
pageColumnAccessInfo[i] = new PageColumnAccessInfo(MAX_NUMBER_OF_POINTS_IN_PAGE);
}

while (timeValuePairIterator.hasNextTimeValuePair()) {
// prepare column access info for current page
int[][] accessInfo = timeValuePairIterator.getColumnAccessInfo();
times[pointNumInPage] = timeValuePairIterator.getTime();
for (int i = 0; i < dataTypes.size(); i++) {
pageColumnAccessInfo[i].add(accessInfo[i]);
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
times[pointNumInPage] = tvPair.getTimestamp();
TsPrimitiveType[] values = tvPair.getValue().getVector();

for (int columnIndex = 0; columnIndex < values.length; columnIndex++) {
ValueChunkWriter valueChunkWriter =
alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
boolean isNull = values[columnIndex] == null;
switch (schemaList.get(columnIndex).getType()) {
case BOOLEAN:
valueChunkWriter.write(tvPair.getTimestamp(), values[columnIndex].getBoolean(), isNull);
break;
case INT32:
case DATE:
valueChunkWriter.write(tvPair.getTimestamp(), values[columnIndex].getInt(), isNull);
break;
case INT64:
case TIMESTAMP:
valueChunkWriter.write(tvPair.getTimestamp(), values[columnIndex].getLong(), isNull);
break;
case FLOAT:
valueChunkWriter.write(tvPair.getTimestamp(), values[columnIndex].getFloat(), isNull);
break;
case DOUBLE:
valueChunkWriter.write(tvPair.getTimestamp(), values[columnIndex].getDouble(), isNull);
break;
case TEXT:
case BLOB:
case STRING:
valueChunkWriter.write(tvPair.getTimestamp(), values[columnIndex].getBinary(), isNull);
break;
default:
break;
}
}
timeValuePairIterator.step();

pointNumInPage++;
pointNumInChunk++;

if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
|| pointNumInChunk >= maxNumberOfPointsInChunk) {
writePageValuesIntoWriter(
alignedChunkWriter, times, pageColumnAccessInfo, timeValuePairIterator);
alignedChunkWriter.write(times, pointNumInPage, 0);
for (PageColumnAccessInfo columnAccessInfo : pageColumnAccessInfo) {
columnAccessInfo.reset();
}
pointNumInPage = 0;
}

Expand All @@ -842,8 +782,6 @@ public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
// last batch of points
if (pointNumInChunk > 0) {
if (pointNumInPage > 0) {
writePageValuesIntoWriter(
alignedChunkWriter, times, pageColumnAccessInfo, timeValuePairIterator);
alignedChunkWriter.write(times, pointNumInPage, 0);
alignedChunkWriter.sealCurrentPage();
alignedChunkWriter.clearPageWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ public int delete(
return deletedPointsNumber;
}

@Override
public long getMeasurementSize(String measurement) {
return memChunk.rowCount();
}

@Override
public IWritableMemChunk getWritableMemChunk(String measurement) {
return memChunk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ void queryForDeviceRegionScan(
boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String measurement);

/** only used when mem control enabled */
long getMeasurementSize(IDeviceID deviceId, String measurement);

IWritableMemChunk getWritableMemChunk(IDeviceID deviceId, String measurement);

/** only used when mem control enabled */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ void writeTablet(
int delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp);

long getMeasurementSize(String measurement);

IWritableMemChunk getWritableMemChunk(String measurement);

long getMaxTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.utils.datastructure.MergeSortTvListIterator;
import org.apache.iotdb.db.utils.datastructure.MergeSortTVListIterator;
import org.apache.iotdb.db.utils.datastructure.TVList;

import org.apache.tsfile.common.conf.TSFileDescriptor;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class ReadOnlyMemChunk {
// TVList and its rowCount during query
private Map<TVList, Integer> tvListQueryMap;

private MergeSortTvListIterator timeValuePairIterator;
private MergeSortTVListIterator timeValuePairIterator;

protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
Expand Down Expand Up @@ -145,13 +145,13 @@ public void sortTvLists() {
public void initChunkMetaFromTvLists() {
// create chunk statistics
Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType);
int cnt = 0;
int pointsInChunk = 0;
int[] deleteCursor = {0};
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
timeValuePairIterator = new MergeSortTvListIterator(tvLists, floatPrecision, encoding);
timeValuePairIterator = new MergeSortTVListIterator(tvLists, floatPrecision, encoding);
int[] tvListOffsets = timeValuePairIterator.getTVListOffsets();
while (timeValuePairIterator.hasNextTimeValuePair()) {
if (cnt % MAX_NUMBER_OF_POINTS_IN_PAGE == 0) {
if (pointsInChunk % MAX_NUMBER_OF_POINTS_IN_PAGE == 0) {
Statistics<? extends Serializable> stats = Statistics.getStatsByType(dataType);
pageStatisticsList.add(stats);
pageOffsetsList.add(Arrays.copyOf(tvListOffsets, tvListOffsets.length));
Expand Down Expand Up @@ -194,7 +194,7 @@ public void initChunkMetaFromTvLists() {
String.format("Data type %s is not supported.", dataType));
}
}
cnt++;
pointsInChunk++;
}
pageOffsetsList.add(Arrays.copyOf(tvListOffsets, tvListOffsets.length));

Expand Down Expand Up @@ -245,7 +245,7 @@ private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws IOExcept
int[] deleteCursor = {0};
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
IPointReader timeValuePairIterator =
new MergeSortTvListIterator(tvLists, floatPrecision, encoding);
new MergeSortTVListIterator(tvLists, floatPrecision, encoding);

while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
Expand Down Expand Up @@ -333,7 +333,7 @@ public TsBlock getTsBlock() {
return null;
}

public MergeSortTvListIterator getMergeSortTVListIterator() {
public MergeSortTVListIterator getMergeSortTVListIterator() {
return timeValuePairIterator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,8 @@ private long[] checkMemCostAndAddToTspInfoForRow(
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurements[i]);
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]);
long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memTableIncrement +=
memChunk != null ? memChunk.getWorkingTVList().tvListArrayMemCost() : 0;
Expand Down Expand Up @@ -627,12 +627,12 @@ private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode> insertRowN
.putIfAbsent(measurements[i], 1);
} else {
// here currentChunkPointNum >= 1
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurements[i]);
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]);
int addingPointNum =
increasingMemTableInfo
.computeIfAbsent(deviceId, k -> new HashMap<>())
.computeIfAbsent(measurements[i], k -> 0);
long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
if ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memTableIncrement +=
memChunk != null
Expand Down Expand Up @@ -873,8 +873,8 @@ private void updateMemCost(
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemCost(dataType);
} else {
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurement);
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurement);
long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
Expand All @@ -886,11 +886,7 @@ private void updateMemCost(
(end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
/ PrimitiveArrayManager.ARRAY_SIZE;
if (acquireArray != 0) {
memIncrements[0] +=
acquireArray
* (memChunk != null
? memChunk.getWorkingTVList().tvListArrayMemCost()
: TVList.tvListArrayMemCost(dataType));
memIncrements[0] += acquireArray * memChunk.getWorkingTVList().tvListArrayMemCost();
}
}
}
Expand Down
Loading

0 comments on commit 5968100

Please sign in to comment.