diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 92803984cee1..7b6353664484 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -792,6 +792,7 @@ public long delete(ModEntry modEntry) { @Override public void addTVListRamCost(long cost) { this.tvListRamCost += cost; + System.out.println(tvListRamCost); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index f81238bef71e..17606e21a423 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; +import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo; import org.apache.iotdb.db.utils.datastructure.MemPointIterator; @@ -57,6 +58,8 @@ import java.util.concurrent.BlockingQueue; import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; +import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; +import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; public class AlignedWritableMemChunk extends AbstractWritableMemChunk { @@ -884,6 +887,61 @@ public int getAvgPointSizeOfLargestColumn() { return avgPointSizeOfLargestColumn; } + public long getTvListArrayMemCostIncrement1( + List insertingMeasurements, List insertingTypes) { + long size = 0; + List> bitMaps = list.getBitMaps(); + // value & bitmap array mem size + for (int column = 0; column < dataTypes.size(); column++) { + if (bitMaps != null && bitMaps.get(column) != null) { + size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; + } + } + int newMeasurementCount = 0; + for (int i = 0; i < insertingMeasurements.size(); i++) { + String measurementName = insertingMeasurements.get(i); + TSDataType type = insertingTypes.get(i); + size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); + if (!measurementIndexMap.containsKey(measurementName)) { + newMeasurementCount++; + } + } + // size is 0 when all types are null + if (size == 0) { + return size; + } + // time array mem size + size += PrimitiveArrayManager.ARRAY_SIZE * 8L; + // index array mem size + size += (list.getIndices() != null) ? PrimitiveArrayManager.ARRAY_SIZE * 4L : 0; + // array headers mem size + size += (long) NUM_BYTES_ARRAY_HEADER * (2 + insertingTypes.size()); + // Object references size in ArrayList + size += (long) NUM_BYTES_OBJECT_REF * (2 + dataTypes.size() + newMeasurementCount); + return size; + } + + public long getTvListArrayMemCostIncrement( + List insertingMeasurements, List insertingTypes) { + long memCostIncrement = 0; + for (int i = 0; i < insertingMeasurements.size(); i++) { + String measurementName = insertingMeasurements.get(i); + TSDataType dataType = insertingTypes.get(i); + Integer columIndex = measurementIndexMap.get(measurementName); + if (columIndex == null) { + memCostIncrement += + (long) PrimitiveArrayManager.ARRAY_SIZE * (long) dataType.getDataTypeSize(); + } else { + List columnArries = list.getValues().get(columIndex); + if (columnArries.get(columnArries.size() - 1) == null) { + memCostIncrement += + (long) PrimitiveArrayManager.ARRAY_SIZE * (long) dataType.getDataTypeSize(); + } + } + } + return memCostIncrement; + } + @Override public void setEncryptParameter(EncryptParameter encryptParameter) { this.encryptParameter = encryptParameter; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 68776c92c0b5..2122212fb845 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -778,7 +778,6 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( } else { // For existed device of this mem table AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; - List dataTypesInTVList = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements if (dataTypes[i] == null @@ -794,15 +793,13 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( + (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - memTableIncrement += currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]); - dataTypesInTVList.add(dataTypes[i]); + memTableIncrement += currentArrayNum * AlignedTVList.emptyValueListArrayMemCost(); } } // this insertion will result in a new array - if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) { - dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes()); - memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); - } + // if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) { + // memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); + // } } for (int i = 0; i < dataTypes.length; i++) { @@ -882,8 +879,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins > 0 ? 1 : 0); - memTableIncrement += - currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]); + memTableIncrement += currentArrayNum * AlignedTVList.emptyValueListArrayMemCost(); } } int addingPointNum = addingPointNumInfo.right; @@ -893,11 +889,11 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes()); } dataTypesInTVList.addAll(addingPointNumInfo.left.values()); - memTableIncrement += - alignedMemChunk != null - ? alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost() - : AlignedTVList.alignedTvListArrayMemCost( - dataTypesInTVList.toArray(new TSDataType[0]), null); + // memTableIncrement += + // alignedMemChunk != null + // ? alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost() + // : AlignedTVList.alignedTvListArrayMemCost( + // dataTypesInTVList.toArray(new TSDataType[0]), null); } addingPointNumInfo.setRight(addingPointNum + 1); } @@ -1072,9 +1068,18 @@ private void updateAlignedMemCost( numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories); } else { AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; - List dataTypesInTVList = new ArrayList<>(); int currentPointNum = alignedMemChunk.alignedListSize(); int newPointNum = currentPointNum + incomingPointNum; + // calculate how many new arrays will be added after this insertion + int currentArrayCnt = + currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + + (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); + int newArrayCnt = + newPointNum / PrimitiveArrayManager.ARRAY_SIZE + + (newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); + long acquireArray = newArrayCnt - currentArrayCnt; + List insertingMeasurements = new ArrayList<>(); + List insertingTypes = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; String measurement = measurementIds[i]; @@ -1085,30 +1090,24 @@ private void updateAlignedMemCost( || (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD)) { continue; } + insertingMeasurements.add(measurement); + insertingTypes.add(dataType); if (!alignedMemChunk.containsMeasurement(measurementIds[i])) { // add a new column in the TVList, the new column should be as long as existing ones - memIncrements[0] += - (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1) - * AlignedTVList.valueListArrayMemCost(dataType); - dataTypesInTVList.add(dataType); + memIncrements[0] += newArrayCnt * AlignedTVList.emptyValueListArrayMemCost(); } } - // calculate how many new arrays will be added after this insertion - int currentArrayCnt = - currentPointNum / PrimitiveArrayManager.ARRAY_SIZE - + (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - int newArrayCnt = - newPointNum / PrimitiveArrayManager.ARRAY_SIZE - + (newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - long acquireArray = newArrayCnt - currentArrayCnt; - if (acquireArray != 0) { // memory of extending the TVList - dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes()); memIncrements[0] += - acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); + acquireArray + * alignedMemChunk.getTvListArrayMemCostIncrement1( + insertingMeasurements, insertingTypes); + } else { + memIncrements[0] += + alignedMemChunk.getTvListArrayMemCostIncrement(insertingMeasurements, insertingTypes); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 4be8e9f437e7..c2fd27a15920 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -199,39 +199,64 @@ public synchronized void putAlignedValue(long timestamp, Object[] value) { List columnValues = values.get(i); if (columnValue == null) { markNullValue(i, arrayIndex, elementIndex); + if (dataTypes.get(i).isBinary()) { + memoryBinaryChunkSize[i] += getBinarySize(Binary.EMPTY_VALUE); + } + continue; } switch (dataTypes.get(i)) { case TEXT: case BLOB: case STRING: - ((Binary[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (Binary) columnValue : Binary.EMPTY_VALUE; - memoryBinaryChunkSize[i] += - columnValue != null - ? getBinarySize((Binary) columnValue) - : getBinarySize(Binary.EMPTY_VALUE); + Binary[] arrayT = (Binary[]) columnValues.get(arrayIndex); + if (arrayT == null) { + arrayT = (Binary[]) getPrimitiveArraysByType(TSDataType.TEXT); + columnValues.set(arrayIndex, arrayT); + } + arrayT[elementIndex] = (Binary) columnValue; + memoryBinaryChunkSize[i] += getBinarySize((Binary) columnValue); break; case FLOAT: - ((float[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (float) columnValue : Float.MIN_VALUE; + float[] arrayF = (float[]) columnValues.get(arrayIndex); + if (arrayF == null) { + arrayF = (float[]) getPrimitiveArraysByType(TSDataType.FLOAT); + columnValues.set(arrayIndex, arrayF); + } + arrayF[elementIndex] = (float) columnValue; break; case INT32: case DATE: - ((int[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (int) columnValue : Integer.MIN_VALUE; + float[] arrayI = (float[]) columnValues.get(arrayIndex); + if (arrayI == null) { + arrayI = (float[]) getPrimitiveArraysByType(TSDataType.INT32); + columnValues.set(arrayIndex, arrayI); + } + arrayI[elementIndex] = (int) columnValue; break; case INT64: case TIMESTAMP: - ((long[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (long) columnValue : Long.MIN_VALUE; + long[] arrayL = (long[]) columnValues.get(arrayIndex); + if (arrayL == null) { + arrayL = (long[]) getPrimitiveArraysByType(TSDataType.INT64); + columnValues.set(arrayIndex, arrayL); + } + arrayL[elementIndex] = (long) columnValue; break; case DOUBLE: - ((double[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (double) columnValue : Double.MIN_VALUE; + double[] arrayD = (double[]) columnValues.get(arrayIndex); + if (arrayD == null) { + arrayD = (double[]) getPrimitiveArraysByType(TSDataType.DOUBLE); + columnValues.set(arrayIndex, arrayD); + } + arrayD[elementIndex] = (double) columnValue; break; case BOOLEAN: - ((boolean[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null && (boolean) columnValue; + boolean[] arrayB = (boolean[]) columnValues.get(arrayIndex); + if (arrayB == null) { + arrayB = (boolean[]) getPrimitiveArraysByType(TSDataType.BOOLEAN); + columnValues.set(arrayIndex, arrayB); + } + arrayB[elementIndex] = (boolean) columnValue; break; default: break; @@ -361,32 +386,7 @@ public void extendColumn(TSDataType dataType) { List columnValue = new ArrayList<>(timestamps.size()); List columnBitMaps = new ArrayList<>(timestamps.size()); for (int i = 0; i < timestamps.size(); i++) { - switch (dataType) { - case TEXT: - case STRING: - case BLOB: - columnValue.add(getPrimitiveArraysByType(TSDataType.TEXT)); - break; - case FLOAT: - columnValue.add(getPrimitiveArraysByType(TSDataType.FLOAT)); - break; - case INT32: - case DATE: - columnValue.add(getPrimitiveArraysByType(TSDataType.INT32)); - break; - case INT64: - case TIMESTAMP: - columnValue.add(getPrimitiveArraysByType(TSDataType.INT64)); - break; - case DOUBLE: - columnValue.add(getPrimitiveArraysByType(TSDataType.DOUBLE)); - break; - case BOOLEAN: - columnValue.add(getPrimitiveArraysByType(TSDataType.BOOLEAN)); - break; - default: - break; - } + columnValue.add(null); BitMap bitMap = new BitMap(ARRAY_SIZE); // The following code is for these 2 kinds of scenarios. @@ -512,17 +512,20 @@ public boolean isNullValue(int unsortedRowIndex, int columnIndex) { if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(unsortedRowIndex)) { return true; } + int arrayIndex = unsortedRowIndex / ARRAY_SIZE; + int elementIndex = unsortedRowIndex % ARRAY_SIZE; - if (columnIndex < 0 || columnIndex >= values.size() || values.get(columnIndex) == null) { + if (columnIndex < 0 + || columnIndex >= values.size() + || values.get(columnIndex) == null + || values.get(columnIndex).get(arrayIndex) == null) { return true; } if (bitMaps == null || bitMaps.get(columnIndex) == null - || bitMaps.get(columnIndex).get(unsortedRowIndex / ARRAY_SIZE) == null) { + || bitMaps.get(columnIndex).get(arrayIndex) == null) { return false; } - int arrayIndex = unsortedRowIndex / ARRAY_SIZE; - int elementIndex = unsortedRowIndex % ARRAY_SIZE; List columnBitMaps = bitMaps.get(columnIndex); return columnBitMaps.get(arrayIndex).isMarked(elementIndex); } @@ -535,6 +538,10 @@ public List getTsDataTypes() { return dataTypes; } + public List getIndices() { + return indices; + } + @Override public int delete(long lowerBound, long upperBound) { int deletedNumber = 0; @@ -638,6 +645,9 @@ public void deleteColumn(int columnIndex) { } protected Object cloneValue(TSDataType type, Object value) { + if (value == null) { + return null; + } switch (type) { case TEXT: case BLOB: @@ -684,7 +694,9 @@ protected void clearValue() { List columnValues = values.get(i); if (columnValues != null) { for (Object dataArray : columnValues) { - PrimitiveArrayManager.release(dataArray); + if (dataArray != null) { + PrimitiveArrayManager.release(dataArray); + } } columnValues.clear(); } @@ -710,7 +722,7 @@ protected void expandValues() { indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); } for (int i = 0; i < dataTypes.size(); i++) { - values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i))); + values.get(i).add(null); if (bitMaps != null && bitMaps.get(i) != null) { bitMaps.get(i).add(null); } @@ -873,6 +885,10 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex case BLOB: case STRING: Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex)); + if (arrayT == null) { + arrayT = (Binary[]) getPrimitiveArraysByType(TSDataType.TEXT); + columnValues.set(arrayIndex, arrayT); + } System.arraycopy(value[i], idx, arrayT, elementIndex, remaining); // update raw size of Text chunk @@ -883,24 +899,44 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex break; case FLOAT: float[] arrayF = ((float[]) columnValues.get(arrayIndex)); + if (arrayF == null) { + arrayF = (float[]) getPrimitiveArraysByType(TSDataType.FLOAT); + columnValues.set(arrayIndex, arrayF); + } System.arraycopy(value[i], idx, arrayF, elementIndex, remaining); break; case INT32: case DATE: int[] arrayI = ((int[]) columnValues.get(arrayIndex)); + if (arrayI == null) { + arrayI = (int[]) getPrimitiveArraysByType(TSDataType.INT32); + columnValues.set(arrayIndex, arrayI); + } System.arraycopy(value[i], idx, arrayI, elementIndex, remaining); break; case INT64: case TIMESTAMP: long[] arrayL = ((long[]) columnValues.get(arrayIndex)); + if (arrayL == null) { + arrayL = (long[]) getPrimitiveArraysByType(TSDataType.INT64); + columnValues.set(arrayIndex, arrayL); + } System.arraycopy(value[i], idx, arrayL, elementIndex, remaining); break; case DOUBLE: double[] arrayD = ((double[]) columnValues.get(arrayIndex)); + if (arrayD == null) { + arrayD = (double[]) getPrimitiveArraysByType(TSDataType.DOUBLE); + columnValues.set(arrayIndex, arrayD); + } System.arraycopy(value[i], idx, arrayD, elementIndex, remaining); break; case BOOLEAN: boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex)); + if (arrayB == null) { + arrayB = (boolean[]) getPrimitiveArraysByType(TSDataType.BOOLEAN); + columnValues.set(arrayIndex, arrayB); + } System.arraycopy(value[i], idx, arrayB, elementIndex, remaining); break; default: @@ -957,7 +993,9 @@ public TSDataType getDataType() { @Override public long calculateRamSize() { - return timestamps.size() * alignedTvListArrayMemCost(); + // return timestamps.size() * alignedTvListArrayMemCost(); + // FIXME:(Haonan) + return 0; } /** @@ -998,18 +1036,20 @@ public static long alignedTvListArrayMemCost( * * @return AlignedTvListArrayMemSize */ - public long alignedTvListArrayMemCost() { + public long alignedTvListArrayMemCost(List insertingTypes) { long size = 0; // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { TSDataType type = dataTypes.get(column); if (type != null) { - size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); if (bitMaps != null && bitMaps.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; } } } + for (TSDataType type : insertingTypes) { + size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); + } // size is 0 when all types are null if (size == 0) { return size; @@ -1019,26 +1059,21 @@ public long alignedTvListArrayMemCost() { // index array mem size size += (indices != null) ? PrimitiveArrayManager.ARRAY_SIZE * 4L : 0; // array headers mem size - size += (long) NUM_BYTES_ARRAY_HEADER * (2 + dataTypes.size()); + size += (long) NUM_BYTES_ARRAY_HEADER * (2 + insertingTypes.size()); // Object references size in ArrayList size += (long) NUM_BYTES_OBJECT_REF * (2 + dataTypes.size()); return size; } /** - * Get the single column array mem cost by give type. + * Get the single empty column array mem cost. * - * @param type the type of the value column - * @return valueListArrayMemCost + * @return emptyValueListArrayMemCost */ - public static long valueListArrayMemCost(TSDataType type) { + public static long emptyValueListArrayMemCost() { long size = 0; - // value array mem size - size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); // bitmap array mem size - size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; - // array headers mem size - size += NUM_BYTES_ARRAY_HEADER; + size += PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; // Object references size in ArrayList size += NUM_BYTES_OBJECT_REF; return size; @@ -1307,50 +1342,52 @@ public void serializeToWAL(IWALByteBufferView buffer) { int arrayIndex = rowIndex / ARRAY_SIZE; int elementIndex = rowIndex % ARRAY_SIZE; // value + boolean isNull = isNullValue(rowIndex, columnIndex); switch (dataTypes.get(columnIndex)) { case TEXT: case BLOB: case STRING: - Binary valueT = ((Binary[]) columnValues.get(arrayIndex))[elementIndex]; // In some scenario, the Binary in AlignedTVList will be null if this field is empty in // current row. We need to handle this scenario to get rid of NPE. See the similar issue // here: https://github.com/apache/iotdb/pull/9884 // Furthermore, we use an empty Binary as a placeholder here. It won't lead to data // error because whether this field is null or not is decided by the bitMap rather than // the object's value here. - if (valueT != null) { - WALWriteUtils.write(valueT, buffer); - } else { - WALWriteUtils.write(new Binary(new byte[0]), buffer); - } + WALWriteUtils.write( + isNull + ? Binary.EMPTY_VALUE + : ((Binary[]) columnValues.get(arrayIndex))[elementIndex], + buffer); break; case FLOAT: - float valueF = ((float[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putFloat(valueF); + buffer.putFloat( + isNull ? Float.MIN_VALUE : ((float[]) columnValues.get(arrayIndex))[elementIndex]); break; case INT32: case DATE: - int valueI = ((int[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putInt(valueI); + buffer.putInt( + isNull ? Integer.MIN_VALUE : ((int[]) columnValues.get(arrayIndex))[elementIndex]); break; case INT64: case TIMESTAMP: - long valueL = ((long[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putLong(valueL); + buffer.putLong( + isNull ? Long.MIN_VALUE : ((long[]) columnValues.get(arrayIndex))[elementIndex]); break; case DOUBLE: - double valueD = ((double[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putDouble(valueD); + buffer.putDouble( + isNull + ? Double.MIN_VALUE + : ((double[]) columnValues.get(arrayIndex))[elementIndex]); break; case BOOLEAN: - boolean valueB = ((boolean[]) columnValues.get(arrayIndex))[elementIndex]; - WALWriteUtils.write(valueB, buffer); + WALWriteUtils.write( + !isNull && ((boolean[]) columnValues.get(arrayIndex))[elementIndex], buffer); break; default: throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } // bitmap - WALWriteUtils.write(isNullValue(rowIndex, columnIndex), buffer); + WALWriteUtils.write(isNull, buffer); } }