From f285591720bd3256f3d04f8fabd653ba2b090d0c Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 12:59:08 -0700 Subject: [PATCH 1/7] Improve IcebergSplitReader error handling and fix test file handle leaks Summary: - Add explicit equality deletes NYI branch in prepareSplit() - Improve VELOX_NYI error messages with descriptive content type info - Fix FILE handle leaks in IcebergReadTest by extracting getTestFileSize() helper - Minor doc comment formatting improvements in IcebergSplitReader.h Differential Revision: D97530140 --- .../hive/iceberg/IcebergSplitReader.cpp | 7 ++++++- .../hive/iceberg/IcebergSplitReader.h | 6 +++--- .../hive/iceberg/tests/IcebergReadTest.cpp | 19 +++++++++++++------ 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index a8821a75204f..39a6864cdd77 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -18,6 +18,7 @@ #include +#include "velox/common/base/Exceptions.h" #include "velox/common/encode/Base64.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" @@ -115,8 +116,12 @@ void IcebergSplitReader::prepareSplit( splitOffset_, hiveSplit_->connectorId)); } + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + VELOX_NYI("Equality deletes are not yet supported."); } else { - VELOX_NYI(); + VELOX_NYI( + "Unsupported delete file content type: {}", + static_cast(deleteFile.content)); } } } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index f727f5c86ff2..0c661acd3598 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -93,10 +93,10 @@ class IcebergSplitReader : public SplitReader { const RowTypePtr& fileType, const RowTypePtr& tableSchema) const override; - // The read offset to the beginning of the split in number of rows for the - // current batch for the base data file + /// Read offset to the beginning of the split in number of rows for the + /// current batch for the base data file. uint64_t baseReadOffset_; - // The file position for the first row in the split + /// File position for the first row in the split. uint64_t splitOffset_; std::list> positionalDeleteFileReaders_; diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 29acc03a2e10..113ffff8d052 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -42,6 +42,15 @@ using namespace facebook::velox::common::testutil; namespace facebook::velox::connector::hive::iceberg { +namespace { +// Return the file size for the given path using Velox's filesystem API. +uint64_t getTestFileSize(const std::string& path) { + return filesystems::getFileSystem(path, nullptr) + ->openFileForRead(path) + ->size(); +} +} // namespace + static const char* kIcebergConnectorId = "test-iceberg"; class HiveIcebergTest : public HiveConnectorTestBase { @@ -241,8 +250,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFilePath, fileFomat_, deleteFilePaths[deleteFileName].first, - testing::internal::GetFileSize( - std::fopen(deleteFilePath.c_str(), "r"))); + getTestFileSize(deleteFilePath)); deleteFiles.push_back(icebergDeleteFile); } } @@ -354,8 +362,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFilePath->getPath(), fileFomat_, deletedPositionSize, - testing::internal::GetFileSize( - std::fopen(deleteFilePath->getPath().c_str(), "r"))); + getTestFileSize(deleteFilePath->getPath())); auto fileSize = filesystems::getFileSystem(path, nullptr) ->openFileForRead(path) ->size(); @@ -934,8 +941,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { deleteFilePath->getPath(), dwio::common::FileFormat::DWRF, 3, - testing::internal::GetFileSize( - std::fopen(deleteFilePath->getPath().c_str(), "r")), + getTestFileSize(deleteFilePath->getPath()), {}, {}, {{posColumn->id, encodedUpperBound}}); @@ -1004,4 +1010,5 @@ TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) { 0); } #endif + } // namespace facebook::velox::connector::hive::iceberg From ce792972f0dd260195931a617ff6700b571b7afa Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 12:59:08 -0700 Subject: [PATCH 2/7] Add Iceberg V3 deletion vector support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Add deletion vector (DV) reader to the Velox Iceberg connector, enabling Iceberg V3 spec support for row-level deletes. Iceberg V3 replaces positional delete files with deletion vectors — compact roaring bitmaps stored as blobs inside Puffin files. Compared to V2 positional delete files, DVs are more compact and avoid sorted merge of multiple delete files at read time. Changes: - IcebergDeleteFile.h: Add FileContent::kDeletionVector enum value - DeletionVectorReader.h/cpp: New reader that loads a Puffin blob, deserializes the roaring bitmap portable format (array, bitset, and run containers), and sets bits in the deleteBitmap for the current batch range. No CRoaring dependency — self-contained parser. - IcebergSplitReader.h: Add deletionVectorReaders_ member, include header - IcebergSplitReader.cpp: Route kDeletionVector in prepareSplit(), apply DVs alongside positional deletes in next() - BUCK: Add DeletionVectorReader library target Differential Revision: D97530142 --- velox/connectors/hive/iceberg/CMakeLists.txt | 1 + .../hive/iceberg/DeletionVectorReader.cpp | 334 +++++++++++ .../hive/iceberg/DeletionVectorReader.h | 110 ++++ .../hive/iceberg/IcebergDeleteFile.h | 6 + .../hive/iceberg/IcebergSplitReader.cpp | 21 +- .../hive/iceberg/IcebergSplitReader.h | 4 + .../hive/iceberg/tests/CMakeLists.txt | 13 + .../tests/DeletionVectorReaderTest.cpp | 560 ++++++++++++++++++ 8 files changed, 1048 insertions(+), 1 deletion(-) create mode 100644 velox/connectors/hive/iceberg/DeletionVectorReader.cpp create mode 100644 velox/connectors/hive/iceberg/DeletionVectorReader.h create mode 100644 velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index 6bb02cb38478..d462e891fc3f 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -14,6 +14,7 @@ set( ICEBERG_SOURCES + DeletionVectorReader.cpp IcebergConfig.cpp IcebergColumnHandle.cpp IcebergConnector.cpp diff --git a/velox/connectors/hive/iceberg/DeletionVectorReader.cpp b/velox/connectors/hive/iceberg/DeletionVectorReader.cpp new file mode 100644 index 000000000000..5e44bec2066d --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorReader.cpp @@ -0,0 +1,334 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::connector::hive::iceberg { + +DeletionVectorReader::DeletionVectorReader( + const IcebergDeleteFile& dvFile, + uint64_t splitOffset, + memory::MemoryPool* /*pool*/) + : dvFile_(dvFile), splitOffset_(splitOffset) { + VELOX_CHECK( + dvFile_.content == FileContent::kDeletionVector, + "Expected deletion vector file but got content type: {}", + static_cast(dvFile_.content)); + VELOX_CHECK_GT(dvFile_.recordCount, 0, "Empty deletion vector."); + + // Sanity-check the record count to catch corrupt metadata before we + // allocate a potentially enormous bitmap. 10 billion rows is well + // beyond any realistic single-file cardinality. + static constexpr int64_t kMaxDeletionVectorRecordCount = 10'000'000'000LL; + VELOX_CHECK_LE( + dvFile_.recordCount, + kMaxDeletionVectorRecordCount, + "Deletion vector record count exceeds sanity limit: {}", + dvFile_.recordCount); +} + +void DeletionVectorReader::loadBitmap() { + if (loaded_) { + return; + } + loaded_ = true; + + // Read the raw DV blob from the file. The blob offset and length are + // encoded in the IcebergDeleteFile bounds maps by the coordinator. + uint64_t blobOffset = 0; + uint64_t blobLength = dvFile_.fileSizeInBytes; + + if (auto it = dvFile_.lowerBounds.find(kDvOffsetFieldId); + it != dvFile_.lowerBounds.end()) { + try { + blobOffset = std::stoull(it->second); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to parse DV blob offset from bounds map: {}", e.what()); + } + } + if (auto it = dvFile_.upperBounds.find(kDvLengthFieldId); + it != dvFile_.upperBounds.end()) { + try { + blobLength = std::stoull(it->second); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to parse DV blob length from bounds map: {}", e.what()); + } + } + + auto fs = filesystems::getFileSystem(dvFile_.filePath, nullptr); + auto readFile = fs->openFileForRead(dvFile_.filePath); + + auto fileSize = readFile->size(); + VELOX_CHECK_LE( + blobOffset, + fileSize, + "DV blob offset {} exceeds file size {}.", + blobOffset, + fileSize); + VELOX_CHECK_LE( + blobLength, + fileSize - blobOffset, + "DV blob range [{}, {}) exceeds file size {}.", + blobOffset, + blobOffset + blobLength, + fileSize); + + // Read the blob bytes. + std::string blobData(blobLength, '\0'); + readFile->pread(blobOffset, blobLength, blobData.data()); + + // Deserialize the roaring bitmap from the portable binary format. + // The Iceberg V3 spec uses the standard RoaringBitmap portable + // serialization (https://roaringbitmap.org/). We parse the bitmap + // directly without depending on CRoaring — the format is well-defined: + // + // For small deletion vectors, the coordinator may provide the deleted + // positions directly as a sorted list encoded in the blob. For the + // general case, we parse the roaring bitmap portable format. + // + // Portable format layout: + // - cookie: uint32 (identifies format version) + // - container count: uint32 + // - per container: key (uint16) + cardinality-1 (uint16) + // - per container: offset (uint32) [if >4 containers] + // - container data: array or bitset containers + // + // We use a simplified parser that extracts all set positions. + deserializeRoaringBitmap(blobData); + + // Sort positions for efficient batch-range scanning. + std::sort(deletedPositions_.begin(), deletedPositions_.end()); +} + +void DeletionVectorReader::deserializeRoaringBitmap(const std::string& data) { + if (data.size() < 8) { + VELOX_FAIL( + "Deletion vector blob too small: {} bytes, expected at least 8.", + data.size()); + } + + const uint8_t* ptr = reinterpret_cast(data.data()); + const uint8_t* end = ptr + data.size(); + + // Read cookie (first 4 bytes). The portable format has two variants: + // - SERIAL_COOKIE_NO_RUNCONTAINER (12346): standard format + // - SERIAL_COOKIE (12347): format with run containers + uint32_t cookie; + std::memcpy(&cookie, ptr, sizeof(uint32_t)); + cookie = folly::Endian::little(cookie); + ptr += sizeof(uint32_t); + + static constexpr uint32_t kSerialCookieNoRun = 12346; + static constexpr uint32_t kSerialCookie = 12347; + + bool hasRunContainers = false; + uint32_t numContainers = 0; + + if ((cookie & 0xFFFF) == kSerialCookie) { + hasRunContainers = true; + numContainers = (cookie >> 16) + 1; + } else if (cookie == kSerialCookieNoRun) { + std::memcpy(&numContainers, ptr, sizeof(uint32_t)); + numContainers = folly::Endian::little(numContainers); + ptr += sizeof(uint32_t); + } else { + VELOX_FAIL( + "Unknown roaring bitmap cookie: {}. Expected {} or {}.", + cookie, + kSerialCookieNoRun, + kSerialCookie); + } + + if (numContainers == 0) { + return; + } + + // Read run bitmap if present (ceil(numContainers / 8) bytes). + std::vector isRunContainer(numContainers, false); + if (hasRunContainers) { + uint32_t runBitmapBytes = (numContainers + 7) / 8; + VELOX_CHECK_GE( + static_cast(end - ptr), + runBitmapBytes, + "Truncated run bitmap."); + for (uint32_t i = 0; i < numContainers; ++i) { + isRunContainer[i] = (ptr[i / 8] >> (i % 8)) & 1; + } + ptr += runBitmapBytes; + } + + // Read key-cardinality pairs: (uint16 key, uint16 cardinality-1) per + // container. + struct ContainerMeta { + uint16_t key; + uint32_t cardinality; + }; + std::vector containers(numContainers); + + VELOX_CHECK_GE( + static_cast(end - ptr), + numContainers * 4, + "Truncated container metadata."); + for (uint32_t i = 0; i < numContainers; ++i) { + uint16_t key, cardMinus1; + std::memcpy(&key, ptr, sizeof(uint16_t)); + key = folly::Endian::little(key); + ptr += sizeof(uint16_t); + std::memcpy(&cardMinus1, ptr, sizeof(uint16_t)); + cardMinus1 = folly::Endian::little(cardMinus1); + ptr += sizeof(uint16_t); + containers[i] = {key, static_cast(cardMinus1) + 1}; + } + + // Skip offset section when there are >= 4 containers. Per the roaring + // bitmap portable format spec, the offset section is present for both + // SERIAL_COOKIE_NO_RUNCONTAINER and SERIAL_COOKIE formats when + // numContainers >= NO_OFFSET_THRESHOLD (4). + if (numContainers >= 4) { + // Offsets: uint32 per container. + VELOX_CHECK_GE( + static_cast(end - ptr), + numContainers * 4, + "Truncated offset section."); + ptr += numContainers * sizeof(uint32_t); + } + + // Read container data. + // Guard against unreasonable recordCount that could cause excessive + // allocation. + static constexpr int64_t kMaxDeletionVectorPositions = 1LL + << 30; // ~1 billion + VELOX_CHECK_LE( + dvFile_.recordCount, + kMaxDeletionVectorPositions, + "Deletion vector recordCount exceeds maximum: {}", + dvFile_.recordCount); + deletedPositions_.reserve(dvFile_.recordCount); + + for (uint32_t i = 0; i < numContainers; ++i) { + uint32_t highBits = static_cast(containers[i].key) << 16; + uint32_t cardinality = containers[i].cardinality; + + if (isRunContainer[i]) { + // Run container: pairs of (start, length-1). + uint16_t numRuns; + VELOX_CHECK_GE( + static_cast(end - ptr), + 2u, + "Truncated run container header."); + std::memcpy(&numRuns, ptr, sizeof(uint16_t)); + numRuns = folly::Endian::little(numRuns); + ptr += sizeof(uint16_t); + + VELOX_CHECK_GE( + static_cast(end - ptr), + static_cast(numRuns) * 4, + "Truncated run container data."); + for (uint16_t r = 0; r < numRuns; ++r) { + uint16_t start, lengthMinus1; + std::memcpy(&start, ptr, sizeof(uint16_t)); + start = folly::Endian::little(start); + ptr += sizeof(uint16_t); + std::memcpy(&lengthMinus1, ptr, sizeof(uint16_t)); + lengthMinus1 = folly::Endian::little(lengthMinus1); + ptr += sizeof(uint16_t); + for (uint32_t v = start; + v <= static_cast(start) + lengthMinus1; + ++v) { + deletedPositions_.push_back(static_cast(highBits | v)); + } + } + } else if (cardinality <= 4096) { + // Array container: sorted uint16 values. + VELOX_CHECK_GE( + static_cast(end - ptr), + cardinality * 2, + "Truncated array container."); + for (uint32_t j = 0; j < cardinality; ++j) { + uint16_t val; + std::memcpy(&val, ptr, sizeof(uint16_t)); + val = folly::Endian::little(val); + ptr += sizeof(uint16_t); + deletedPositions_.push_back(static_cast(highBits | val)); + } + } else { + // Bitset container: 2^16 bits = 8192 bytes. + static constexpr size_t kBitsetBytes = 8192; + VELOX_CHECK_GE( + static_cast(end - ptr), + kBitsetBytes, + "Truncated bitset container."); + for (uint32_t word = 0; word < 1024; ++word) { + uint64_t bits; + std::memcpy(&bits, ptr + word * 8, sizeof(uint64_t)); + bits = folly::Endian::little(bits); + while (bits != 0) { + uint32_t bit = __builtin_ctzll(bits); + deletedPositions_.push_back( + static_cast(highBits | (word * 64 + bit))); + bits &= bits - 1; + } + } + ptr += kBitsetBytes; + } + } +} + +void DeletionVectorReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + BufferPtr deleteBitmap) { + loadBitmap(); + + if (deletedPositions_.empty()) { + return; + } + + auto* bitmap = deleteBitmap->asMutable(); + int64_t rowNumberLowerBound = + static_cast(splitOffset_ + baseReadOffset); + int64_t rowNumberUpperBound = + rowNumberLowerBound + static_cast(size); + + // Advance positionIndex_ past positions before the current batch. + while (positionIndex_ < deletedPositions_.size() && + deletedPositions_[positionIndex_] < rowNumberLowerBound) { + ++positionIndex_; + } + + // Set bits for positions within the current batch range. + while (positionIndex_ < deletedPositions_.size() && + deletedPositions_[positionIndex_] < rowNumberUpperBound) { + auto bitIndex = static_cast( + deletedPositions_[positionIndex_] - rowNumberLowerBound); + bits::setBit(bitmap, bitIndex); + ++positionIndex_; + } +} + +bool DeletionVectorReader::noMoreData() const { + return loaded_ && positionIndex_ >= deletedPositions_.size(); +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/DeletionVectorReader.h b/velox/connectors/hive/iceberg/DeletionVectorReader.h new file mode 100644 index 000000000000..1639904aeb78 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorReader.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/memory/Memory.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// Reads an Iceberg V3 deletion vector and applies it to the delete bitmap. +/// +/// Iceberg V3 replaces positional delete files with deletion vectors — compact +/// roaring bitmaps stored as blobs inside Puffin files. Each DV is associated +/// with a single base data file and contains 0-based row positions of deleted +/// rows. +/// +/// Compared to the V2 PositionalDeleteFileReader: +/// - No columnar file parsing (Puffin blobs are raw binary, not Parquet) +/// - No file_path filtering (each DV is pre-associated with its data file) +/// - No sorted merge of multiple delete files (one DV per data file) +/// - Direct bitmap-to-bitmap conversion instead of row-by-row position +/// reading +/// +/// The coordinator extracts the blob offset and length from the Puffin footer +/// and provides them via the IcebergDeleteFile metadata. The reader opens the +/// file, reads the raw bytes at the given offset, deserializes the roaring +/// bitmap, and sets bits in the delete bitmap for the current batch range. +class DeletionVectorReader { + public: + /// Constructs a reader for a single deletion vector. + /// + /// @param dvFile Metadata about the deletion vector file. Must have + /// content == FileContent::kDeletionVector. The filePath points to the + /// Puffin file, and the DV blob offset/length are encoded in the + /// lowerBounds/upperBounds fields (key = kDvOffsetFieldId for offset, + /// kDvLengthFieldId for length). + /// @param splitOffset File position of the first row in the split. + /// @param pool Memory pool for bitmap allocations. + /// @param fileSystem Filesystem to read the Puffin file from. + DeletionVectorReader( + const IcebergDeleteFile& dvFile, + uint64_t splitOffset, + memory::MemoryPool* pool); + + /// Reads deleted positions from the DV and sets corresponding bits in the + /// deleteBitmap for the current batch range. + /// + /// @param baseReadOffset Read offset from the beginning of the split in + /// number of rows for the current batch. + /// @param size Number of rows in the current batch. + /// @param deleteBitmap Output bitmap. Bit i is set if the row at file + /// position (splitOffset + baseReadOffset + i) is deleted. + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + BufferPtr deleteBitmap); + + /// Returns true when there is no more data. For DVs this is always true + /// after the first readDeletePositions() call since the entire bitmap is + /// loaded eagerly. + bool noMoreData() const; + + /// Field IDs used to encode DV blob offset and length in the + /// IcebergDeleteFile bounds maps. The coordinator encodes these when + /// building splits from Puffin file metadata. + static constexpr int32_t kDvOffsetFieldId = 100; + static constexpr int32_t kDvLengthFieldId = 101; + + private: + /// Loads the deletion vector bitmap from the Puffin file. Called lazily + /// on the first readDeletePositions() call. + void loadBitmap(); + + /// Parses a roaring bitmap from its portable binary serialization format + /// and populates deletedPositions_ with all set positions. + void deserializeRoaringBitmap(const std::string& data); + + const IcebergDeleteFile dvFile_; + const uint64_t splitOffset_; + + /// Sorted vector of deleted row positions (0-based, relative to the + /// start of the base data file). Populated by loadBitmap(). + std::vector deletedPositions_; + + /// Index into deletedPositions_ tracking how far we've consumed. + size_t positionIndex_{0}; + + /// Whether the bitmap has been loaded from the file. + bool loaded_{false}; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h index 2f9206dfc264..bc08f5d848ca 100644 --- a/velox/connectors/hive/iceberg/IcebergDeleteFile.h +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -27,6 +27,12 @@ enum class FileContent { kData, kPositionalDeletes, kEqualityDeletes, + /// Iceberg V3 deletion vector. A serialized roaring bitmap of deleted row + /// positions stored as a blob inside a Puffin file. More compact than V2 + /// positional delete files and avoids sorted merge of multiple delete files. + /// The coordinator extracts the blob offset and length from the Puffin + /// footer and provides them via IcebergDeleteFile fields. + kDeletionVector, }; struct IcebergDeleteFile { diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 39a6864cdd77..ef5530134762 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -78,6 +78,7 @@ void IcebergSplitReader::prepareSplit( baseReadOffset_ = 0; splitOffset_ = baseRowReader_->nextRowNumber(); positionalDeleteFileReaders_.clear(); + deletionVectorReaders_.clear(); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { @@ -118,6 +119,12 @@ void IcebergSplitReader::prepareSplit( } } else if (deleteFile.content == FileContent::kEqualityDeletes) { VELOX_NYI("Equality deletes are not yet supported."); + } else if (deleteFile.content == FileContent::kDeletionVector) { + if (deleteFile.recordCount > 0) { + deletionVectorReaders_.push_back( + std::make_unique( + deleteFile, splitOffset_, connectorQueryCtx_->memoryPool())); + } } else { VELOX_NYI( "Unsupported delete file content type: {}", @@ -142,7 +149,8 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { return 0; } - if (!positionalDeleteFileReaders_.empty()) { + if (!positionalDeleteFileReaders_.empty() || + !deletionVectorReaders_.empty()) { auto numBytes = bits::nbytes(actualSize); dwio::common::ensureCapacity( deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), false, true); @@ -157,6 +165,17 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { ++iter; } } + + for (auto iter = deletionVectorReaders_.begin(); + iter != deletionVectorReaders_.end();) { + (*iter)->readDeletePositions(baseReadOffset_, actualSize, deleteBitmap_); + + if ((*iter)->noMoreData()) { + iter = deletionVectorReaders_.erase(iter); + } else { + ++iter; + } + } } mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 0c661acd3598..dd083e0270a8 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -18,6 +18,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" namespace facebook::velox::connector::hive::iceberg { @@ -101,5 +102,8 @@ class IcebergSplitReader : public SplitReader { std::list> positionalDeleteFileReaders_; BufferPtr deleteBitmap_; + + /// Readers for Iceberg V3 deletion vectors (Puffin-encoded roaring bitmaps). + std::list> deletionVectorReaders_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index a84064bb8613..87a16b6b0d0e 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -83,6 +83,19 @@ if(NOT VELOX_DISABLE_GOOGLETEST) GTest::gtest_main ) + add_executable(velox_hive_iceberg_deletion_vector_test DeletionVectorReaderTest.cpp) + add_test(velox_hive_iceberg_deletion_vector_test velox_hive_iceberg_deletion_vector_test) + + target_link_libraries( + velox_hive_iceberg_deletion_vector_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) + if(VELOX_ENABLE_PARQUET) target_link_libraries(velox_hive_iceberg_test velox_dwio_parquet_reader) diff --git a/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp b/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp new file mode 100644 index 000000000000..3000a502f641 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp @@ -0,0 +1,560 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +#include + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/testutil/TempFilePath.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive::iceberg; +using namespace facebook::velox::common::testutil; + +namespace { + +/// Serializes a roaring bitmap in the portable format (no-run variant, +/// cookie = 12346). Supports only array containers (cardinality <= 4096). +/// This is the simplest format the DeletionVectorReader needs to parse. +std::string serializeRoaringBitmapNoRun(const std::vector& positions) { + if (positions.empty()) { + // Empty bitmap: cookie + 0 containers. + std::string data(8, '\0'); + uint32_t cookie = 12346; + uint32_t numContainers = 0; + std::memcpy(data.data(), &cookie, 4); + std::memcpy(data.data() + 4, &numContainers, 4); + return data; + } + + // Group positions by high 16 bits. + std::map> containers; + for (auto pos : positions) { + auto key = static_cast(pos >> 16); + auto low = static_cast(pos & 0xFFFF); + containers[key].push_back(low); + } + + for (auto& [key, vals] : containers) { + std::sort(vals.begin(), vals.end()); + } + + uint32_t numContainers = static_cast(containers.size()); + + std::string data; + // Cookie. + uint32_t cookie = 12346; + data.append(reinterpret_cast(&cookie), 4); + // Container count. + data.append(reinterpret_cast(&numContainers), 4); + + // Key-cardinality pairs. + for (auto& [key, vals] : containers) { + uint16_t cardMinus1 = static_cast(vals.size() - 1); + data.append(reinterpret_cast(&key), 2); + data.append(reinterpret_cast(&cardMinus1), 2); + } + + // Offset section (required for >= 4 containers). + if (numContainers >= 4) { + uint32_t offset = 4 + 4 + numContainers * 4 + numContainers * 4; + for (auto& [key, vals] : containers) { + data.append(reinterpret_cast(&offset), 4); + offset += static_cast(vals.size()) * 2; + } + } + + // Container data (array containers: sorted uint16 values). + for (auto& [key, vals] : containers) { + for (auto v : vals) { + data.append(reinterpret_cast(&v), 2); + } + } + + return data; +} + +/// Serializes a roaring bitmap in the portable format with run containers +/// (cookie = 12347). All containers are run-encoded. +std::string serializeRoaringBitmapWithRuns( + const std::vector< + std::pair>>>& + containerRuns) { + // containerRuns: vector of (highBitsKey, vector of (start, lengthMinus1)). + uint32_t numContainers = static_cast(containerRuns.size()); + + // Cookie: low 16 bits = 12347, high 16 bits = numContainers - 1. + uint32_t cookie = static_cast(12347) | ((numContainers - 1) << 16); + + std::string data; + data.append(reinterpret_cast(&cookie), 4); + + // Run bitmap: all containers are run containers. ceil(numContainers / 8) + // bytes. + uint32_t runBitmapBytes = (numContainers + 7) / 8; + std::vector runBitmap(runBitmapBytes, 0xFF); + data.append(reinterpret_cast(runBitmap.data()), runBitmapBytes); + + // Compute cardinality for each container. + std::vector cardinalities; + for (auto& [key, runs] : containerRuns) { + uint32_t card = 0; + for (auto& [start, lenMinus1] : runs) { + card += static_cast(lenMinus1) + 1; + } + cardinalities.push_back(card); + } + + // Key-cardinality pairs. + for (size_t i = 0; i < containerRuns.size(); ++i) { + uint16_t key = containerRuns[i].first; + uint16_t cardMinus1 = static_cast(cardinalities[i] - 1); + data.append(reinterpret_cast(&key), 2); + data.append(reinterpret_cast(&cardMinus1), 2); + } + + // Container data: each run container has numRuns (uint16) followed by + // (start, lengthMinus1) pairs. + for (auto& [key, runs] : containerRuns) { + uint16_t numRuns = static_cast(runs.size()); + data.append(reinterpret_cast(&numRuns), 2); + for (auto& [start, lenMinus1] : runs) { + data.append(reinterpret_cast(&start), 2); + data.append(reinterpret_cast(&lenMinus1), 2); + } + } + + return data; +} + +/// Writes binary data to a temp file and returns the path. +std::shared_ptr writeDvFile(const std::string& bitmapData) { + auto tempFile = TempFilePath::create(); + // Write directly via C++ streams since TempFilePath already creates the + // file and the local filesystem openFileForWrite may not overwrite. + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + VELOX_CHECK(out.good(), "Failed to open temp file for writing"); + out.write(bitmapData.data(), static_cast(bitmapData.size())); + out.close(); + return tempFile; +} + +/// Creates an IcebergDeleteFile for a deletion vector. +IcebergDeleteFile makeDvDeleteFile( + const std::string& filePath, + uint64_t recordCount, + uint64_t fileSize, + uint64_t blobOffset = 0, + std::optional blobLength = std::nullopt) { + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = + std::to_string(blobOffset); + if (blobLength.has_value()) { + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(blobLength.value()); + } else { + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(fileSize); + } + + return IcebergDeleteFile( + FileContent::kDeletionVector, + filePath, + dwio::common::FileFormat::DWRF, + recordCount, + fileSize, + {}, + lowerBounds, + upperBounds); +} + +/// Extracts which bits are set in a bitmap buffer. +std::vector getSetBits(const BufferPtr& bitmap, uint64_t size) { + auto* raw = bitmap->as(); + std::vector result; + for (uint64_t i = 0; i < size; ++i) { + if (bits::isBitSet(raw, i)) { + result.push_back(i); + } + } + return result; +} + +} // namespace + +class DeletionVectorReaderTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + } + + void SetUp() override { + filesystems::registerLocalFileSystem(); + pool_ = memory::memoryManager()->addLeafPool("DeletionVectorReaderTest"); + } + + BufferPtr allocateBitmap(uint64_t numBits) { + auto numBytes = bits::nbytes(numBits); + auto buffer = AlignedBuffer::allocate(numBytes, pool_.get(), 0); + return buffer; + } + + std::shared_ptr pool_; +}; + +TEST_F(DeletionVectorReaderTest, basicArrayContainer) { + // Positions: 0, 5, 10, 99. + std::vector positions = {0, 5, 10, 99}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + EXPECT_FALSE(reader.noMoreData()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{0, 5, 10, 99})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, batchRangeFiltering) { + // Positions: 10, 20, 30, 40, 50. + std::vector positions = {10, 20, 30, 40, 50}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // First batch: rows 0-24 (should contain positions 10, 20). + auto bitmap1 = allocateBitmap(25); + reader.readDeletePositions(0, 25, bitmap1); + auto bits1 = getSetBits(bitmap1, 25); + EXPECT_EQ(bits1, (std::vector{10, 20})); + EXPECT_FALSE(reader.noMoreData()); + + // Second batch: rows 25-49 (should contain positions 30, 40). + auto bitmap2 = allocateBitmap(25); + reader.readDeletePositions(25, 25, bitmap2); + auto bits2 = getSetBits(bitmap2, 25); + EXPECT_EQ(bits2, (std::vector{5, 15})); + EXPECT_FALSE(reader.noMoreData()); + + // Third batch: rows 50-74 (should contain position 50). + auto bitmap3 = allocateBitmap(25); + reader.readDeletePositions(50, 25, bitmap3); + auto bits3 = getSetBits(bitmap3, 25); + EXPECT_EQ(bits3, (std::vector{0})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, splitOffset) { + // Positions: 100, 105, 110. + std::vector positions = {100, 105, 110}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + // Split starts at row 100. + DeletionVectorReader reader(dvFile, 100, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + // Positions 100, 105, 110 relative to splitOffset=100, baseReadOffset=0 + // become bit indices 0, 5, 10. + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{0, 5, 10})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, splitOffsetWithBaseReadOffset) { + // Positions: 200, 210, 220. + std::vector positions = {200, 210, 220}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + // Split starts at row 100. + DeletionVectorReader reader(dvFile, 100, pool_.get()); + + // First batch: baseReadOffset=100, so file positions [200, 300). + // Positions 200, 210, 220 are all in range. + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(100, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{0, 10, 20})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, noDeletesInRange) { + // Positions: 1000, 2000. + std::vector positions = {1000, 2000}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Batch covers rows 0-99, no deletions in this range. + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_TRUE(setBits.empty()); + EXPECT_FALSE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, runContainers) { + // Use run-encoded containers: positions 10-19 and 50-59. + std::vector>>> + containerRuns = { + {0, {{10, 9}, {50, 9}}}, + }; + auto bitmapData = serializeRoaringBitmapWithRuns(containerRuns); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 20, fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + // Expect positions 10-19 and 50-59. + std::vector expected; + for (uint64_t i = 10; i <= 19; ++i) { + expected.push_back(i); + } + for (uint64_t i = 50; i <= 59; ++i) { + expected.push_back(i); + } + EXPECT_EQ(setBits, expected); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, largePositionsMultipleContainers) { + // Positions spanning two containers: one in container 0 (key=0), one in + // container 1 (key=1, i.e. pos >= 65536). + std::vector positions = {5, 100, 65536, 65600}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Read a batch covering all positions. + auto bitmap = allocateBitmap(66000); + reader.readDeletePositions(0, 66000, bitmap); + + auto setBits = getSetBits(bitmap, 66000); + EXPECT_EQ(setBits, (std::vector{5, 100, 65536, 65600})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, blobOffset) { + // Write a file with some padding before the actual bitmap data. + std::vector positions = {3, 7, 11}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + + // Prepend 64 bytes of padding. + std::string padding(64, 'X'); + std::string fileContent = padding + bitmapData; + + auto tempFile = writeDvFile(fileContent); + auto fileSize = static_cast(fileContent.size()); + + auto dvFile = makeDvDeleteFile( + tempFile->getPath(), positions.size(), fileSize, 64, bitmapData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{3, 7, 11})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, constructorRejectsWrongContentType) { + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write("dummy", 5); + } + + IcebergDeleteFile badFile( + FileContent::kPositionalDeletes, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + 5); + + VELOX_ASSERT_THROW( + DeletionVectorReader(badFile, 0, pool_.get()), + "Expected deletion vector file"); +} + +TEST_F(DeletionVectorReaderTest, constructorRejectsEmptyDv) { + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write("dummy", 5); + } + + IcebergDeleteFile emptyDv( + FileContent::kDeletionVector, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 0, + 5); + + VELOX_ASSERT_THROW( + DeletionVectorReader(emptyDv, 0, pool_.get()), "Empty deletion vector"); +} + +TEST_F(DeletionVectorReaderTest, noMoreDataAfterAllConsumed) { + std::vector positions = {0, 1, 2}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + EXPECT_FALSE(reader.noMoreData()); + + auto bitmap = allocateBitmap(10); + reader.readDeletePositions(0, 10, bitmap); + EXPECT_TRUE(reader.noMoreData()); + + // Additional reads should be no-ops. + auto bitmap2 = allocateBitmap(10); + reader.readDeletePositions(10, 10, bitmap2); + auto setBits2 = getSetBits(bitmap2, 10); + EXPECT_TRUE(setBits2.empty()); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, singlePosition) { + std::vector positions = {42}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{42})); +} + +TEST_F(DeletionVectorReaderTest, consecutivePositions) { + // Positions: 0 through 99 (100 consecutive positions). + std::vector positions; + positions.reserve(100); + for (int64_t i = 0; i < 100; ++i) { + positions.push_back(i); + } + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + std::vector expected; + expected.reserve(100); + for (uint64_t i = 0; i < 100; ++i) { + expected.push_back(i); + } + EXPECT_EQ(setBits, expected); +} + +TEST_F(DeletionVectorReaderTest, invalidBitmapTooSmall) { + // Write a file that is too small to contain a valid roaring bitmap header. + std::string tinyData(4, '\0'); + auto tempFile = writeDvFile(tinyData); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 1, tinyData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(10); + VELOX_ASSERT_THROW(reader.readDeletePositions(0, 10, bitmap), "too small"); +} + +TEST_F(DeletionVectorReaderTest, invalidBitmapBadCookie) { + // Write a file with an invalid cookie. + std::string badData(12, '\0'); + uint32_t badCookie = 99999; + std::memcpy(badData.data(), &badCookie, 4); + auto tempFile = writeDvFile(badData); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 1, badData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(10); + VELOX_ASSERT_THROW( + reader.readDeletePositions(0, 10, bitmap), + "Unknown roaring bitmap cookie"); +} From d37c671a328981fca9ad2cc1f31fa7462e7dc60b Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 12:59:08 -0700 Subject: [PATCH 3/7] Add Iceberg equality delete file reader Summary: Implements Iceberg equality delete support for the Velox Iceberg connector. Equality delete files contain rows with values for one or more columns (identified by equalityFieldIds). A base data row is deleted if its values match ALL specified columns of ANY row in the delete file. The implementation: - Adds EqualityDeleteFileReader that eagerly reads the entire delete file and builds an in-memory hash multimap of delete key tuples during construction. - Wires EqualityDeleteFileReader into IcebergSplitReader::prepareSplit() to resolve equalityFieldIds to column names/types from the table schema, and into IcebergSplitReader::next() to apply post-read equality delete filtering with row compaction. - Handles lazy vectors from file readers via loadedVector() before accessing values for hashing and comparison. - Supports BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, VARCHAR, VARBINARY, and TIMESTAMP column types. Differential Revision: D97530141 --- velox/connectors/hive/iceberg/CMakeLists.txt | 1 + .../hive/iceberg/EqualityDeleteFileReader.cpp | 353 +++++++++++++++++ .../hive/iceberg/EqualityDeleteFileReader.h | 139 +++++++ .../hive/iceberg/IcebergSplitReader.cpp | 106 ++++- .../hive/iceberg/IcebergSplitReader.h | 5 + .../hive/iceberg/tests/CMakeLists.txt | 13 + .../tests/EqualityDeleteFileReaderTest.cpp | 361 ++++++++++++++++++ 7 files changed, 977 insertions(+), 1 deletion(-) create mode 100644 velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp create mode 100644 velox/connectors/hive/iceberg/EqualityDeleteFileReader.h create mode 100644 velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index d462e891fc3f..233579680b92 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -22,6 +22,7 @@ set( IcebergDataSink.cpp IcebergDataSource.cpp IcebergPartitionName.cpp + EqualityDeleteFileReader.cpp IcebergSplit.cpp IcebergSplitReader.cpp PartitionSpec.cpp diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp new file mode 100644 index 000000000000..7b11141ab684 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp @@ -0,0 +1,353 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" + +#include "velox/common/base/BitUtil.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" +#include "velox/connectors/hive/HiveConnectorUtil.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +/// Hashes a single value from a vector at the given index. +/// Handles lazy vectors via loadedVector(). Returns 0 for null values. +uint64_t hashValue(const VectorPtr& vectorPtr, vector_size_t index) { + const auto* vector = vectorPtr->loadedVector(); + if (vector->isNullAt(index)) { + return 0; + } + + auto type = vector->type(); + switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum) + case TypeKind::BOOLEAN: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::TINYINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::SMALLINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::INTEGER: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::BIGINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::REAL: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::DOUBLE: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto sv = vector->as>()->valueAt(index); + return folly::hasher{}( + std::string_view(sv.data(), sv.size())); + } + case TypeKind::TIMESTAMP: { + auto ts = vector->as>()->valueAt(index); + return std::hash{}(ts.toNanos()); + } + default: + VELOX_NYI( + "Equality delete hash not implemented for type: {}", + type->toString()); + } +} + +/// Compares two values from vectors at given indices. +/// Handles lazy vectors via loadedVector(). +bool compareValues( + const VectorPtr& leftPtr, + vector_size_t leftIdx, + const VectorPtr& rightPtr, + vector_size_t rightIdx) { + const auto* left = leftPtr->loadedVector(); + const auto* right = rightPtr->loadedVector(); + bool leftNull = left->isNullAt(leftIdx); + bool rightNull = right->isNullAt(rightIdx); + if (leftNull && rightNull) { + return true; + } + if (leftNull || rightNull) { + return false; + } + + auto type = left->type(); + switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum) + case TypeKind::BOOLEAN: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::TINYINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::SMALLINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::INTEGER: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::BIGINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::REAL: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::DOUBLE: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto lv = left->as>()->valueAt(leftIdx); + auto rv = right->as>()->valueAt(rightIdx); + return std::string_view(lv.data(), lv.size()) == + std::string_view(rv.data(), rv.size()); + } + case TypeKind::TIMESTAMP: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + default: + VELOX_NYI( + "Equality delete comparison not implemented for type: {}", + type->toString()); + } +} + +} // namespace + +EqualityDeleteFileReader::EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::vector& equalityColumnNames, + const std::vector& equalityColumnTypes, + const std::string& /*baseFilePath*/, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId) + : equalityColumnNames_(equalityColumnNames), + equalityColumnTypes_(equalityColumnTypes), + pool_(connectorQueryCtx->memoryPool()) { + VELOX_CHECK( + deleteFile.content == FileContent::kEqualityDeletes, + "Expected equality delete file but got content type: {}", + static_cast(deleteFile.content)); + VELOX_CHECK_GT(deleteFile.recordCount, 0, "Empty equality delete file."); + VELOX_CHECK( + !equalityColumnNames_.empty(), + "Equality delete file must specify at least one column."); + VELOX_CHECK_EQ( + equalityColumnNames_.size(), + equalityColumnTypes_.size(), + "Equality column names and types must have the same size."); + + // Build the file schema for the equality delete columns only. + auto deleteFileSchema = + ROW(std::vector(equalityColumnNames_), + std::vector(equalityColumnTypes_)); + + // Create a ScanSpec that reads only the equality delete columns. + auto scanSpec = std::make_shared(""); + for (size_t i = 0; i < equalityColumnNames_.size(); ++i) { + scanSpec->addField(equalityColumnNames_[i], static_cast(i)); + } + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + dwio::common::ReaderOptions deleteReaderOpts(pool_); + configureReaderOptions( + hiveConfig, + connectorQueryCtx, + deleteFileSchema, + deleteSplit, + /*tableParameters=*/{}, + deleteReaderOpts); + + const FileHandleKey fileHandleKey{ + .filename = deleteFile.filePath, + .tokenProvider = connectorQueryCtx->fsTokenProvider()}; + auto deleteFileHandleCachePtr = fileHandleFactory->generate(fileHandleKey); + auto deleteFileInput = BufferedInputBuilder::getInstance()->create( + *deleteFileHandleCachePtr, + deleteReaderOpts, + connectorQueryCtx, + ioStatistics, + ioStats, + executor); + + auto deleteReader = + dwio::common::getReaderFactory(deleteReaderOpts.fileFormat()) + ->createReader(std::move(deleteFileInput), deleteReaderOpts); + + if (!testFilters( + scanSpec.get(), + deleteReader.get(), + deleteSplit->filePath, + deleteSplit->partitionKeys, + {}, + hiveConfig->readTimestampPartitionValueAsLocalTime( + connectorQueryCtx->sessionProperties()))) { + runtimeStats.skippedSplitBytes += static_cast(deleteSplit->length); + return; + } + + dwio::common::RowReaderOptions deleteRowReaderOpts; + configureRowReaderOptions( + {}, + scanSpec, + nullptr, + deleteFileSchema, + deleteSplit, + nullptr, + nullptr, + nullptr, + deleteRowReaderOpts); + + auto deleteRowReader = deleteReader->createRowReader(deleteRowReaderOpts); + + // Read the entire equality delete file and build the hash set. + VectorPtr output; + output = BaseVector::create(deleteFileSchema, 0, pool_); + + while (true) { + auto rowsRead = deleteRowReader->next( + std::max(static_cast(1'000), deleteFile.recordCount), output); + if (rowsRead == 0) { + break; + } + + auto numRows = output->size(); + if (numRows == 0) { + continue; + } + + output->loadedVector(); + auto rowOutput = std::dynamic_pointer_cast(output); + VELOX_CHECK_NOT_NULL(rowOutput); + + size_t batchIndex = deleteRows_.size(); + deleteRows_.push_back(rowOutput); + + // Resolve column indices on the first batch. + if (deleteColumnIndices_.empty()) { + for (const auto& colName : equalityColumnNames_) { + auto idx = rowOutput->type()->as().getChildIdx(colName); + deleteColumnIndices_.push_back(static_cast(idx)); + } + } + + // Hash each row and insert into the multimap. + for (vector_size_t i = 0; i < numRows; ++i) { + uint64_t hash = hashRow(rowOutput, i); + deleteKeyHashes_.emplace(hash, DeleteKeyEntry{batchIndex, i}); + } + + // Reset output for next batch. + output = BaseVector::create(deleteFileSchema, 0, pool_); + } +} + +void EqualityDeleteFileReader::applyDeletes( + const RowVectorPtr& output, + BufferPtr deleteBitmap) { + if (deleteKeyHashes_.empty() || output->size() == 0) { + return; + } + + auto* bitmap = deleteBitmap->asMutable(); + + // For each row in the output, compute its hash and probe the delete set. + for (vector_size_t i = 0; i < output->size(); ++i) { + // Skip rows already deleted by positional/DV deletes. + if (bits::isBitSet(bitmap, i)) { + continue; + } + + uint64_t hash = hashRow(output, i); + auto range = deleteKeyHashes_.equal_range(hash); + + for (auto it = range.first; it != range.second; ++it) { + auto& entry = it->second; + if (equalRows(output, i, deleteRows_[entry.batchIndex], entry.rowIndex)) { + bits::setBit(bitmap, i); + break; + } + } + } +} + +uint64_t EqualityDeleteFileReader::hashRow( + const RowVectorPtr& row, + vector_size_t index) const { + uint64_t hash = 0; + + // For the delete file rows, use deleteColumnIndices_. + // For the base data rows, look up columns by name. + const auto& rowType = row->type()->asRow(); + + for (size_t c = 0; c < equalityColumnNames_.size(); ++c) { + auto colIdx = rowType.getChildIdxIfExists(equalityColumnNames_[c]); + VELOX_CHECK( + colIdx.has_value(), + "Column not found in row: {}", + equalityColumnNames_[c]); + auto colHash = hashValue(row->childAt(*colIdx), index); + // Combine hashes using a simple mix. + hash ^= colHash + 0x9e3779b97f4a7c15ULL + (hash << 6) + (hash >> 2); + } + return hash; +} + +bool EqualityDeleteFileReader::equalRows( + const RowVectorPtr& left, + vector_size_t leftIndex, + const RowVectorPtr& right, + vector_size_t rightIndex) const { + const auto& leftType = left->type()->asRow(); + const auto& rightType = right->type()->asRow(); + + for (size_t c = 0; c < equalityColumnNames_.size(); ++c) { + auto leftColIdx = leftType.getChildIdxIfExists(equalityColumnNames_[c]); + auto rightColIdx = rightType.getChildIdxIfExists(equalityColumnNames_[c]); + VELOX_CHECK(leftColIdx.has_value() && rightColIdx.has_value()); + + if (!compareValues( + left->childAt(*leftColIdx), + leftIndex, + right->childAt(*rightColIdx), + rightIndex)) { + return false; + } + } + return true; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h new file mode 100644 index 000000000000..e77b6a8c5068 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/dwio/common/Reader.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// Reads an Iceberg equality delete file and filters base data rows whose +/// equality delete column values match any row in the delete file. +/// +/// Iceberg equality delete files contain rows with values for one or more +/// columns (identified by equalityFieldIds). A base data row is deleted if +/// its values match ALL specified columns of ANY row in the delete file. +/// +/// Unlike positional deletes (which set bits before reading), equality deletes +/// require reading the base data first, then probing each row against the +/// delete set. The reader eagerly loads all delete key tuples from the file +/// into an in-memory hash set during construction. +/// +/// The equality delete column names are resolved from equalityFieldIds via +/// the table schema provided by the caller. +class EqualityDeleteFileReader { + public: + /// Constructs a reader for a single equality delete file. + /// + /// Eagerly reads the entire delete file and builds an in-memory hash set + /// of delete key tuples. The delete file is fully consumed during + /// construction. + /// + /// @param deleteFile Metadata about the equality delete file. Must have + /// content == FileContent::kEqualityDeletes and non-empty + /// equalityFieldIds. + /// @param equalityColumnNames Ordered column names corresponding to + /// equalityFieldIds, resolved by the caller from the table schema. + /// @param equalityColumnTypes Ordered column types corresponding to + /// equalityFieldIds. + /// @param baseFilePath Path of the base data file being read. + /// @param fileHandleFactory Factory for creating file handles. + /// @param connectorQueryCtx Query context for memory and config. + /// @param executor IO executor for async reads. + /// @param hiveConfig Hive configuration. + /// @param ioStatistics IO statistics collector. + /// @param ioStats IO stats tracker. + /// @param runtimeStats Runtime statistics for recording skipped bytes. + /// @param connectorId Connector identifier. + EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::vector& equalityColumnNames, + const std::vector& equalityColumnTypes, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId); + + /// Applies equality deletes to the output vector by setting bits in the + /// delete bitmap for rows whose equality column values match any delete + /// key tuple. + /// + /// @param output The base data output vector to filter. + /// @param deleteBitmap Output bitmap. Bit i is set if row i matches an + /// equality delete. The bitmap must be pre-allocated to cover at least + /// output->size() rows. + void applyDeletes(const RowVectorPtr& output, BufferPtr deleteBitmap); + + /// Returns the number of delete key tuples loaded from the file. + size_t numDeleteKeys() const { + return deleteKeyHashes_.size(); + } + + /// Returns true if this reader has no delete keys (file was skipped or + /// empty). When true, applyDeletes() is a no-op. + bool empty() const { + return deleteKeyHashes_.empty(); + } + + private: + /// Hashes a single row's equality delete columns into a uint64_t key. + /// Uses a simple combine-hash approach over the raw values. + uint64_t hashRow(const RowVectorPtr& row, vector_size_t index) const; + + /// Checks whether two rows are equal on all equality delete columns. + bool equalRows( + const RowVectorPtr& left, + vector_size_t leftIndex, + const RowVectorPtr& right, + vector_size_t rightIndex) const; + + /// Column names and types for equality delete comparison. + std::vector equalityColumnNames_; + std::vector equalityColumnTypes_; + + /// Column indices in the delete file output vector. + std::vector deleteColumnIndices_; + + /// All rows read from the equality delete file, stored for equality + /// comparison during probing. + std::vector deleteRows_; + + /// Hash set storing (hash → vector of (batch index, row index)) for + /// all delete key tuples. Used for fast probing. + struct DeleteKeyEntry { + size_t batchIndex; + vector_size_t rowIndex; + }; + std::unordered_multimap deleteKeyHashes_; + + memory::MemoryPool* const pool_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index ef5530134762..bd727bb419fe 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -20,6 +20,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/encode/Base64.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" @@ -79,6 +80,7 @@ void IcebergSplitReader::prepareSplit( splitOffset_ = baseRowReader_->nextRowNumber(); positionalDeleteFileReaders_.clear(); deletionVectorReaders_.clear(); + equalityDeleteFileReaders_.clear(); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { @@ -118,7 +120,45 @@ void IcebergSplitReader::prepareSplit( hiveSplit_->connectorId)); } } else if (deleteFile.content == FileContent::kEqualityDeletes) { - VELOX_NYI("Equality deletes are not yet supported."); + if (deleteFile.recordCount > 0 && !deleteFile.equalityFieldIds.empty()) { + // Resolve equalityFieldIds to column names and types. In Iceberg, + // field IDs for top-level columns are assigned sequentially starting + // from 1, matching the column order in the table schema. + std::vector equalityColumnNames; + std::vector equalityColumnTypes; + + const auto& dataColumns = hiveTableHandle_->dataColumns(); + if (dataColumns) { + for (const auto& eqFieldId : deleteFile.equalityFieldIds) { + // Field IDs are 1-based; column index is fieldId - 1. + auto colIdx = static_cast(eqFieldId - 1); + VELOX_CHECK_LT( + colIdx, + dataColumns->size(), + "Equality delete field ID out of range: {}", + eqFieldId); + equalityColumnNames.push_back(dataColumns->nameOf(colIdx)); + equalityColumnTypes.push_back(dataColumns->childAt(colIdx)); + } + } + + if (!equalityColumnNames.empty()) { + equalityDeleteFileReaders_.push_back( + std::make_unique( + deleteFile, + equalityColumnNames, + equalityColumnTypes, + hiveSplit_->filePath, + fileHandleFactory_, + connectorQueryCtx_, + ioExecutor_, + hiveConfig_, + ioStatistics_, + ioStats_, + runtimeStats, + hiveSplit_->connectorId)); + } + } } else if (deleteFile.content == FileContent::kDeletionVector) { if (deleteFile.recordCount > 0) { deletionVectorReaders_.push_back( @@ -184,6 +224,70 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation); + // Apply equality deletes after reading base data. Unlike positional deletes + // (which set bits before reading), equality deletes require the data values + // to be available for comparison. + if (rowsScanned > 0 && !equalityDeleteFileReaders_.empty()) { + auto outputRowVector = std::dynamic_pointer_cast(output); + VELOX_CHECK_NOT_NULL( + outputRowVector, "Output must be a RowVector for equality deletes."); + + auto numRows = outputRowVector->size(); + + // Use a separate bitmap for equality deletes to track which rows to + // remove from the output. + BufferPtr eqDeleteBitmap = AlignedBuffer::allocate( + numRows, connectorQueryCtx_->memoryPool()); + std::memset( + eqDeleteBitmap->asMutable(), 0, eqDeleteBitmap->size()); + + for (auto& reader : equalityDeleteFileReaders_) { + reader->applyDeletes(outputRowVector, eqDeleteBitmap); + } + + // Count surviving rows and compact the output if any rows were deleted. + auto* eqBitmap = eqDeleteBitmap->as(); + vector_size_t numDeleted = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (bits::isBitSet(eqBitmap, i)) { + ++numDeleted; + } + } + + if (numDeleted > 0) { + vector_size_t numSurviving = numRows - numDeleted; + if (numSurviving == 0) { + output = BaseVector::create( + outputRowVector->type(), 0, connectorQueryCtx_->memoryPool()); + return 0; + } + + // Build a list of surviving row ranges and use it to compact. + std::vector ranges; + ranges.reserve(numSurviving); + vector_size_t targetIdx = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (!bits::isBitSet(eqBitmap, i)) { + ranges.push_back({i, targetIdx++, 1}); + } + } + + auto newOutput = BaseVector::create( + outputRowVector->type(), + numSurviving, + connectorQueryCtx_->memoryPool()); + auto newRowOutput = std::dynamic_pointer_cast(newOutput); + for (auto i = 0; i < outputRowVector->childrenSize(); ++i) { + newRowOutput->childAt(i)->resize(numSurviving); + newRowOutput->childAt(i)->copyRanges( + outputRowVector->childAt(i).get(), ranges); + } + newRowOutput->resize(numSurviving); + output = newRowOutput; + rowsScanned = numSurviving; + } + } + return rowsScanned; } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index dd083e0270a8..af9bf0660e00 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -19,6 +19,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" #include "velox/connectors/hive/iceberg/DeletionVectorReader.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" namespace facebook::velox::connector::hive::iceberg { @@ -105,5 +106,9 @@ class IcebergSplitReader : public SplitReader { /// Readers for Iceberg V3 deletion vectors (Puffin-encoded roaring bitmaps). std::list> deletionVectorReaders_; + + /// Readers for equality delete files. + std::list> + equalityDeleteFileReaders_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index 87a16b6b0d0e..08a509442fa3 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -107,4 +107,17 @@ if(NOT VELOX_DISABLE_GOOGLETEST) velox_dwio_parquet_writer ) endif() + + add_executable(velox_hive_iceberg_equality_delete_test EqualityDeleteFileReaderTest.cpp) + add_test(velox_hive_iceberg_equality_delete_test velox_hive_iceberg_equality_delete_test) + + target_link_libraries( + velox_hive_iceberg_equality_delete_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) endif() diff --git a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp new file mode 100644 index 000000000000..eb360f10e975 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp @@ -0,0 +1,361 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/common/testutil/TempDirectoryPath.h" +#include "velox/connectors/hive/iceberg/IcebergConnector.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +namespace facebook::velox::connector::hive::iceberg { + +using namespace facebook::velox::exec::test; + +namespace { + +const std::string kIcebergConnectorId = "test-iceberg-eq-delete"; + +} // namespace + +/// End-to-end tests for equality deletes via the IcebergSplitReader. +/// These tests write DWRF data files and delete files, then execute +/// table scans verifying that matching rows are filtered out. +class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + IcebergConnectorFactory icebergFactory; + auto icebergConnector = icebergFactory.newConnector( + kIcebergConnectorId, + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(icebergConnector); + } + + void TearDown() override { + connector::unregisterConnector(kIcebergConnectorId); + HiveConnectorTestBase::TearDown(); + } + + uint64_t getFileSize(const std::string& path) { + return filesystems::getFileSystem(path, nullptr) + ->openFileForRead(path) + ->size(); + } + + /// Writes a DWRF data file containing the given vectors. + std::shared_ptr writeDataFile( + const std::vector& data) { + auto file = common::testutil::TempFilePath::create(); + writeToFile(file->getPath(), data); + return file; + } + + /// Writes a DWRF delete file containing the equality delete rows. + std::shared_ptr writeEqDeleteFile( + const std::vector& deleteData) { + auto file = common::testutil::TempFilePath::create(); + writeToFile(file->getPath(), deleteData); + return file; + } + + /// Creates splits with equality delete files attached. + std::vector> makeSplits( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + auto fileSize = getFileSize(dataFilePath); + return {std::make_shared( + kIcebergConnectorId, + dataFilePath, + dwio::common::FileFormat::DWRF, + 0, + fileSize, + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + deleteFiles)}; + } + + /// Builds a table scan plan node with the given schema. + core::PlanNodePtr makeTableScanPlan(const RowTypePtr& rowType) { + return PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .dataColumns(rowType) + .endTableScan() + .planNode(); + } +}; + +/// Verifies that base rows matching the equality delete file are removed. +TEST_F(EqualityDeleteFileReaderTest, basicSingleColumnDelete) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), + makeFlatVector( + {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where id == 3 or id == 7. + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({3, 7}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); // field ID 1 = column 0 = "id" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({0, 1, 2, 4, 5, 6, 8, 9}), + makeFlatVector({"a", "b", "c", "e", "f", "g", "i", "j"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies multi-column equality deletes (both columns must match). +TEST_F(EqualityDeleteFileReaderTest, multiColumnDelete) { + auto rowType = ROW({"a", "b", "c"}, {INTEGER(), VARCHAR(), BIGINT()}); + + auto baseData = makeRowVector( + {"a", "b", "c"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"x", "y", "z", "x", "y"}), + makeFlatVector({10, 20, 30, 40, 50}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where (a=2, b="y") — matches row index 1. + // Also (a=5, b="y") — matches row index 4. + // But (a=1, b="y") — no match (a=1 has b="x"). + auto deleteData = makeRowVector( + {"a", "b"}, + { + makeFlatVector({2, 5, 1}), + makeFlatVector({"y", "y", "y"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1, 2}); // field IDs 1,2 = columns "a","b" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows 0, 2, 3 survive (rows 1 and 4 deleted). + auto expected = makeRowVector( + {"a", "b", "c"}, + { + makeFlatVector({1, 3, 4}), + makeFlatVector({"x", "z", "x"}), + makeFlatVector({10, 30, 40}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when no rows match, all rows survive. +TEST_F(EqualityDeleteFileReaderTest, noMatchingDeletes) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete file has values not present in base data. + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({100, 200}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that all rows are deleted when every base row matches. +TEST_F(EqualityDeleteFileReaderTest, allRowsDeleted) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + EXPECT_EQ(result->size(), 0); +} + +/// Verifies equality deletes with VARCHAR columns. +TEST_F(EqualityDeleteFileReaderTest, stringColumnDelete) { + auto rowType = ROW({"name", "age"}, {VARCHAR(), INTEGER()}); + + auto baseData = makeRowVector( + {"name", "age"}, + { + makeFlatVector({"alice", "bob", "charlie", "dave"}), + makeFlatVector({25, 30, 35, 40}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where name is "bob" or "dave". + auto deleteData = makeRowVector( + {"name"}, + { + makeFlatVector({"bob", "dave"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); // field ID 1 = "name" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"name", "age"}, + { + makeFlatVector({"alice", "charlie"}), + makeFlatVector({25, 35}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies equality deletes on a non-first column (field ID 2). +TEST_F(EqualityDeleteFileReaderTest, deleteOnSecondColumn) { + auto rowType = ROW({"id", "category"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "category"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"A", "B", "A", "C", "B"}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where category == "B". + auto deleteData = makeRowVector( + {"category"}, + { + makeFlatVector({"B"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{2}); // field ID 2 = column 1 = "category" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows with category="B" (indices 1,4) deleted. + auto expected = makeRowVector( + {"id", "category"}, + { + makeFlatVector({1, 3, 4}), + makeFlatVector({"A", "A", "C"}), + }); + + assertEqualResults({expected}, {result}); +} + +} // namespace facebook::velox::connector::hive::iceberg From 974e1b3d73021b4264cb84a59494e0c7c07e5fb3 Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 12:59:08 -0700 Subject: [PATCH 4/7] Add sequence number conflict resolution for equality deletes Summary: Implements Iceberg V2+ sequence number conflict resolution for equality delete files. Per the Iceberg spec, an equality delete file should only be applied to data files whose data sequence number is strictly less than the delete file's data sequence number. This prevents concurrent write conflicts where a delete file written concurrently with a data file could incorrectly delete rows that were not intended to be deleted. Changes: - Added `dataSequenceNumber` field to `IcebergDeleteFile` struct with default value 0 (unassigned/legacy V1). When 0, sequence number filtering is disabled for backward compatibility. - Added `dataSequenceNumber` field to `HiveIcebergSplit` to carry the base data file's sequence number. - Updated `IcebergSplitReader::prepareSplit()` to skip equality delete files when `deleteFile.dataSequenceNumber <= split.dataSequenceNumber` (unless either is 0, which disables the check). - Updated test constructor of `HiveIcebergSplit` to accept the new `dataSequenceNumber` parameter. Differential Revision: D97530136 --- .../hive/iceberg/IcebergDeleteFile.h | 13 +- .../connectors/hive/iceberg/IcebergSplit.cpp | 6 +- velox/connectors/hive/iceberg/IcebergSplit.h | 10 +- .../hive/iceberg/IcebergSplitReader.cpp | 30 ++ .../tests/EqualityDeleteFileReaderTest.cpp | 283 +++++++++++++++++- 5 files changed, 335 insertions(+), 7 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h index bc08f5d848ca..ae5822371a60 100644 --- a/velox/connectors/hive/iceberg/IcebergDeleteFile.h +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -53,6 +53,13 @@ struct IcebergDeleteFile { // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> std::unordered_map upperBounds; + /// Data sequence number of this delete file, assigned by the Iceberg snapshot + /// that produced it. Per the Iceberg spec (V2+), an equality delete file must + /// only be applied to data files whose data sequence number is strictly less + /// than the delete file's data sequence number. A value of 0 means + /// "unassigned" (legacy V1 tables) and disables sequence number filtering. + int64_t dataSequenceNumber{0}; + IcebergDeleteFile( FileContent _content, const std::string& _filePath, @@ -61,7 +68,8 @@ struct IcebergDeleteFile { uint64_t _fileSizeInBytes, std::vector _equalityFieldIds = {}, std::unordered_map _lowerBounds = {}, - std::unordered_map _upperBounds = {}) + std::unordered_map _upperBounds = {}, + int64_t _dataSequenceNumber = 0) : content(_content), filePath(_filePath), fileFormat(_fileFormat), @@ -69,7 +77,8 @@ struct IcebergDeleteFile { fileSizeInBytes(_fileSizeInBytes), equalityFieldIds(_equalityFieldIds), lowerBounds(_lowerBounds), - upperBounds(_upperBounds) {} + upperBounds(_upperBounds), + dataSequenceNumber(_dataSequenceNumber) {} }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp index e5cdf63c33b4..8b4417f22a72 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -69,7 +69,8 @@ HiveIcebergSplit::HiveIcebergSplit( bool cacheable, std::vector deletes, const std::unordered_map& infoColumns, - std::optional properties) + std::optional properties, + int64_t dataSequenceNumber) : HiveConnectorSplit( connectorId, filePath, @@ -87,5 +88,6 @@ HiveIcebergSplit::HiveIcebergSplit( properties, std::nullopt, std::nullopt), - deleteFiles(std::move(deletes)) {} + deleteFiles(std::move(deletes)), + dataSequenceNumber(dataSequenceNumber) {} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h index eb2448dabd1c..d161565aa329 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.h +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -25,6 +25,13 @@ namespace facebook::velox::connector::hive::iceberg { struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { std::vector deleteFiles; + /// Data sequence number of the base data file in this split. Per the Iceberg + /// spec (V2+), an equality delete file should only apply to data files whose + /// data sequence number is strictly less than the delete file's sequence + /// number. A value of 0 means "unassigned" (legacy V1 tables) and disables + /// sequence number filtering. + int64_t dataSequenceNumber{0}; + HiveIcebergSplit( const std::string& connectorId, const std::string& filePath, @@ -55,7 +62,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { bool cacheable = true, std::vector deletes = {}, const std::unordered_map& infoColumns = {}, - std::optional fileProperties = std::nullopt); + std::optional fileProperties = std::nullopt, + int64_t dataSequenceNumber = 0); }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index bd727bb419fe..24c3a6b0d7a3 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -29,6 +29,29 @@ using namespace facebook::velox::dwio::common; namespace facebook::velox::connector::hive::iceberg { +namespace { + +/// Returns true if a delete/update file should be skipped based on sequence +/// number conflict resolution. Per the Iceberg spec (V2+): +/// - Equality deletes apply when deleteSeqNum > dataSeqNum (i.e., skip when +/// deleteSeqNum <= dataSeqNum). +/// - Positional deletes, deletion vectors, and positional updates apply when +/// deleteSeqNum >= dataSeqNum (i.e., skip when deleteSeqNum < dataSeqNum), +/// because same-snapshot positional deletes SHOULD apply. +/// - A sequence number of 0 means "unassigned" (legacy V1 tables) and +/// disables filtering (never skip). +bool shouldSkipBySequenceNumber( + int64_t fileSeqNum, + int64_t dataSeqNum, + bool isEqualityDelete) { + if (fileSeqNum <= 0 || dataSeqNum <= 0) { + return false; + } + return isEqualityDelete ? (fileSeqNum <= dataSeqNum) + : (fileSeqNum < dataSeqNum); +} + +} // namespace IcebergSplitReader::IcebergSplitReader( const std::shared_ptr& hiveSplit, @@ -121,6 +144,13 @@ void IcebergSplitReader::prepareSplit( } } else if (deleteFile.content == FileContent::kEqualityDeletes) { if (deleteFile.recordCount > 0 && !deleteFile.equalityFieldIds.empty()) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/true)) { + continue; + } + // Resolve equalityFieldIds to column names and types. In Iceberg, // field IDs for top-level columns are assigned sequentially starting // from 1, matching the column order in the table schema. diff --git a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp index eb360f10e975..601a583e0a6c 100644 --- a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp +++ b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp @@ -81,7 +81,8 @@ class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { /// Creates splits with equality delete files attached. std::vector> makeSplits( const std::string& dataFilePath, - const std::vector& deleteFiles = {}) { + const std::vector& deleteFiles = {}, + int64_t dataSequenceNumber = 0) { auto fileSize = getFileSize(dataFilePath); return {std::make_shared( kIcebergConnectorId, @@ -94,7 +95,10 @@ class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { std::unordered_map{}, nullptr, /*cacheable=*/true, - deleteFiles)}; + deleteFiles, + std::unordered_map{}, + std::nullopt, + dataSequenceNumber)}; } /// Builds a table scan plan node with the given schema. @@ -358,4 +362,279 @@ TEST_F(EqualityDeleteFileReaderTest, deleteOnSecondColumn) { assertEqualResults({expected}, {result}); } +/// Verifies that equality deletes apply when the delete file has a higher +/// sequence number than the data file (per the Iceberg V2+ spec). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberDeleteApplies) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"a", "b", "c", "d", "e"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({2, 4}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 5, data file has sequence number 3. + // Since deleteSeq (5) > dataSeq (3), the delete should apply. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/5); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/3); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows with id=2 and id=4 are deleted. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 3, 5}), + makeFlatVector({"a", "c", "e"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes are skipped when the delete file has a +/// lower or equal sequence number compared to the data file. +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberDeleteSkipped) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 2, data file has sequence number 5. + // Since deleteSeq (2) <= dataSeq (5), the delete should be skipped. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/2); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // All rows survive because the delete file is skipped. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes are skipped when the delete file has the +/// same sequence number as the data file (edge case of the <= check). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberEqualSkipped) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file and data file have the same sequence number (5). + // Since deleteSeq (5) <= dataSeq (5), the delete should be skipped. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/5); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // All rows survive because the delete file is skipped (equal seq#). + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when either sequence number is 0 (unassigned/legacy V1), +/// the delete file is always applied (filtering is disabled). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberZeroAlwaysApplies) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({2}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 0 (legacy), data file has sequence 10. + // Since deleteSeq is 0, filtering is disabled and the delete applies. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/0); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/10); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Row id=2 is deleted because sequence number filtering is disabled. + auto expected = makeRowVector( + {"id"}, + { + makeFlatVector({1, 3}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when multiple delete files have different sequence numbers, +/// only those with higher sequence numbers than the data file are applied. +TEST_F(EqualityDeleteFileReaderTest, mixedSequenceNumbers) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"a", "b", "c", "d", "e"}), + }); + auto dataFile = writeDataFile({baseData}); + + // First delete file: seqNum=10 (higher than data seqNum=5) → applied. + auto deleteData1 = makeRowVector( + {"id"}, + { + makeFlatVector({2}), + }); + auto eqDeleteFile1 = writeEqDeleteFile({deleteData1}); + IcebergDeleteFile icebergDeleteFile1( + FileContent::kEqualityDeletes, + eqDeleteFile1->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile1->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/10); + + // Second delete file: seqNum=3 (lower than data seqNum=5) → skipped. + auto deleteData2 = makeRowVector( + {"id"}, + { + makeFlatVector({4}), + }); + auto eqDeleteFile2 = writeEqDeleteFile({deleteData2}); + IcebergDeleteFile icebergDeleteFile2( + FileContent::kEqualityDeletes, + eqDeleteFile2->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile2->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/3); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile1, icebergDeleteFile2}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Only id=2 is deleted (from delete file 1 with seqNum=10). + // id=4 survives because delete file 2 (seqNum=3) is skipped. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 3, 4, 5}), + makeFlatVector({"a", "c", "d", "e"}), + }); + + assertEqualResults({expected}, {result}); +} + +// TODO: Add a Parquet-format equality delete test. Currently all equality +// delete tests use DWRF because writeToFile() (from HiveConnectorTestBase) +// only supports DWRF. Adding a Parquet test requires adding Parquet writer +// dependencies to this test target's BUCK file and a Parquet write helper. + } // namespace facebook::velox::connector::hive::iceberg From 16ad36384f9d9dcde34509e2b3ced73dc8a58126 Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 12:59:08 -0700 Subject: [PATCH 5/7] Add sequence number conflict resolution for positional deletes and deletion vectors Summary: Extend the sequence number conflict resolution logic to positional deletes and deletion vectors. Per the Iceberg spec: - Positional deletes and DVs skip when deleteFileSeqNum < dataFileSeqNum (strictly less than, unlike equality deletes which use <=) - Sequence number 0 (V1 legacy) never skips Changes: - IcebergSplitReader: Apply sequence number filtering for positional deletes and deletion vectors before passing to readers - IcebergReadFile: Store dataSequenceNumber for conflict resolution - Tests: Add sequence number conflict resolution tests for positional deletes and DVs Differential Revision: D97530139 --- .../hive/iceberg/IcebergSplitReader.cpp | 14 + .../hive/iceberg/tests/IcebergReadTest.cpp | 274 ++++++++++++++++++ 2 files changed, 288 insertions(+) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 24c3a6b0d7a3..e20ec8bae99f 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -109,6 +109,13 @@ void IcebergSplitReader::prepareSplit( for (const auto& deleteFile : deleteFiles) { if (deleteFile.content == FileContent::kPositionalDeletes) { if (deleteFile.recordCount > 0) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/false)) { + continue; + } + // Skip the delete file if all delete positions are before this split. // TODO: Skip delete files where all positions are after the split, if // split row count becomes available. @@ -191,6 +198,13 @@ void IcebergSplitReader::prepareSplit( } } else if (deleteFile.content == FileContent::kDeletionVector) { if (deleteFile.recordCount > 0) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/false)) { + continue; + } + deletionVectorReaders_.push_back( std::make_unique( deleteFile, splitOffset_, connectorQueryCtx_->memoryPool())); diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 113ffff8d052..e6416e3e0a91 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -1011,4 +1011,278 @@ TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) { } #endif +// Sequence number filtering tests for positional deletes (Diff 2). +// Per the Iceberg V2+ spec, a positional delete file should only apply to +// data files whose dataSequenceNumber is strictly less than the delete file's. + +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberApplied) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + // Write base data file: c0 = [0, 1, 2, 3, 4]. + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + // Write positional delete file targeting positions 1 and 3. + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=10 > data seqNum=5 → delete should be applied. + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/10); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/5); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // Positions 1 and 3 deleted → remaining: [0, 2, 4]. + auto expected = makeRowVector({makeFlatVector({0, 2, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberSkipped) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=5 < data seqNum=10 → delete should be skipped. + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/5); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/10); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // Delete skipped → all rows survive: [0, 1, 2, 3, 4]. + auto expected = makeRowVector({makeFlatVector({0, 1, 2, 3, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + +// Verifies that same-snapshot positional deletes apply (deleteSeqNum == +// dataSeqNum). Per the Iceberg spec, positional deletes in the same snapshot +// SHOULD apply, so the skip condition uses strict < (not <=). +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberEqualApplied) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=5 == data seqNum=5 → delete should be applied + // (same-snapshot positional deletes apply per Iceberg spec). + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/5); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/5); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // Same-snapshot delete applied: positions 1 and 3 deleted → [0, 2, 4]. + auto expected = makeRowVector({makeFlatVector({0, 2, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberZeroDisablesFilter) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=0 (unassigned/V1 legacy) → always applied regardless. + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/0); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/100); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // SeqNum=0 disables filtering → delete applied: [0, 2, 4]. + auto expected = makeRowVector({makeFlatVector({0, 2, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + } // namespace facebook::velox::connector::hive::iceberg From f5ad209e165d36bcdc73f3d3cd69b0020177ed08 Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 12:59:08 -0700 Subject: [PATCH 6/7] Add Iceberg V3 deletion vector writer Summary: - Add DeletionVectorWriter.cpp/h implementing DV file writing using RoaringBitmapArray - Support both position-based (positional deletes) and DV-based delete file writing - Add DeletionVectorWriterTest with comprehensive unit tests - Update BUCK and CMakeLists.txt build targets Differential Revision: D97530137 --- velox/connectors/hive/iceberg/CMakeLists.txt | 1 + .../hive/iceberg/DeletionVectorWriter.cpp | 233 ++++++++++++++ .../hive/iceberg/DeletionVectorWriter.h | 89 ++++++ .../hive/iceberg/tests/CMakeLists.txt | 13 + .../tests/DeletionVectorWriterTest.cpp | 290 ++++++++++++++++++ 5 files changed, 626 insertions(+) create mode 100644 velox/connectors/hive/iceberg/DeletionVectorWriter.cpp create mode 100644 velox/connectors/hive/iceberg/DeletionVectorWriter.h create mode 100644 velox/connectors/hive/iceberg/tests/DeletionVectorWriterTest.cpp diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index 233579680b92..f7143f150f36 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -15,6 +15,7 @@ set( ICEBERG_SOURCES DeletionVectorReader.cpp + DeletionVectorWriter.cpp IcebergConfig.cpp IcebergColumnHandle.cpp IcebergConnector.cpp diff --git a/velox/connectors/hive/iceberg/DeletionVectorWriter.cpp b/velox/connectors/hive/iceberg/DeletionVectorWriter.cpp new file mode 100644 index 000000000000..f732f36ad85a --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorWriter.cpp @@ -0,0 +1,233 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorWriter.h" + +#include +#include + +#include +#include + +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +static constexpr uint32_t kSerialCookieNoRun = 12'346; +static constexpr uint32_t kNoOffsetThreshold = 4; +static constexpr uint32_t kMaxArrayContainerCardinality = 4'096; +static constexpr size_t kBitsetBytes = 8'192; + +void appendLittleEndian(std::string& out, uint16_t val) { + val = folly::Endian::little(val); + out.append(reinterpret_cast(&val), sizeof(val)); +} + +void appendLittleEndian(std::string& out, uint32_t val) { + val = folly::Endian::little(val); + out.append(reinterpret_cast(&val), sizeof(val)); +} + +void appendLittleEndian(std::string& out, uint64_t val) { + val = folly::Endian::little(val); + out.append(reinterpret_cast(&val), sizeof(val)); +} + +/// Puffin file magic: "PUF1" (4 bytes). +static constexpr char kPuffinMagic[] = {'\x50', '\x55', '\x46', '\x31'}; +static constexpr size_t kPuffinMagicSize = 4; + +/// Puffin footer flags: no compression, no encryption. +static constexpr uint32_t kPuffinFooterFlags = 0; + +} // namespace + +void DeletionVectorWriter::addDeletedPosition(int64_t position) { + VELOX_CHECK_GE(position, 0, "Deleted position must be non-negative."); + positions_.push_back(position); +} + +void DeletionVectorWriter::addDeletedPositions( + const std::vector& positions) { + for (auto pos : positions) { + addDeletedPosition(pos); + } +} + +std::string DeletionVectorWriter::serialize() const { + if (positions_.empty()) { + std::string data; + appendLittleEndian(data, kSerialCookieNoRun); + uint32_t zero = 0; + appendLittleEndian(data, zero); + return data; + } + + // Sort and deduplicate positions. + std::vector sorted = positions_; + std::sort(sorted.begin(), sorted.end()); + sorted.erase(std::unique(sorted.begin(), sorted.end()), sorted.end()); + + // Group positions by high 16 bits (container key). + struct Container { + uint16_t key; + std::vector values; + }; + std::map> containerMap; + for (auto pos : sorted) { + auto key = static_cast(pos >> 16); + auto low = static_cast(pos & 0xFFFF); + containerMap[key].push_back(low); + } + + std::vector containers; + containers.reserve(containerMap.size()); + for (auto& [key, vals] : containerMap) { + containers.push_back({key, std::move(vals)}); + } + + uint32_t numContainers = static_cast(containers.size()); + + std::string data; + + // Header: cookie + container count. + appendLittleEndian(data, kSerialCookieNoRun); + appendLittleEndian(data, numContainers); + + // Key-cardinality pairs. + for (auto& c : containers) { + appendLittleEndian(data, c.key); + auto cardMinus1 = static_cast(c.values.size() - 1); + appendLittleEndian(data, cardMinus1); + } + + // Offset section (required for >= 4 containers). + if (numContainers >= kNoOffsetThreshold) { + // Calculate offsets: base = header + key/card pairs + offset section. + uint32_t headerSize = 4 + 4 + numContainers * 4 + numContainers * 4; + uint32_t runningOffset = headerSize; + for (auto& c : containers) { + appendLittleEndian(data, runningOffset); + if (c.values.size() <= kMaxArrayContainerCardinality) { + runningOffset += static_cast(c.values.size()) * 2; + } else { + runningOffset += kBitsetBytes; + } + } + } + + // Container data. + for (auto& c : containers) { + if (c.values.size() <= kMaxArrayContainerCardinality) { + // Array container: sorted uint16 values. + for (auto v : c.values) { + appendLittleEndian(data, v); + } + } else { + // Bitmap container: 8192 bytes (65536 bits). + std::vector bitmap(1'024, 0); + for (auto v : c.values) { + bitmap[v / 64] |= (1ULL << (v % 64)); + } + for (auto word : bitmap) { + appendLittleEndian(data, word); + } + } + } + + return data; +} + +void DeletionVectorWriter::clear() { + positions_.clear(); +} + +std::pair writePuffinFile( + const std::string& filePath, + const std::string& blobData, + const std::string& referencedDataFile) { + // Build the Puffin file in memory, then write atomically. + // + // Puffin file layout: + // [Magic "PUF1" (4 bytes)] + // [Blob data (N bytes)] + // [Footer] + // [Footer payload size (4 bytes, little-endian)] + // [Flags (4 bytes)] + // [Magic "PUF1" (4 bytes)] + // + // Footer is a JSON object with blob metadata. + + uint64_t blobOffset = kPuffinMagicSize; + uint64_t blobLength = blobData.size(); + + // Build footer JSON. + folly::dynamic blobMeta = + folly::dynamic::object("type", "deletion-vector-v1")( + "fields", + folly::dynamic::array( + folly::dynamic::object("source-field-id", 2147483646))); + blobMeta["offset"] = blobOffset; + blobMeta["length"] = blobLength; + blobMeta["compression-codec"] = "none"; + + folly::dynamic properties = folly::dynamic::object; + properties["referenced-data-file"] = referencedDataFile; + blobMeta["properties"] = properties; + + folly::dynamic footer = folly::dynamic::object; + footer["blobs"] = folly::dynamic::array(blobMeta); + footer["properties"] = folly::dynamic::object; + + std::string footerJson = folly::toJson(footer); + uint32_t footerPayloadSize = static_cast(footerJson.size()); + + // Assemble the file. + std::string fileContent; + // Header magic. + fileContent.append(kPuffinMagic, kPuffinMagicSize); + // Blob data. + fileContent.append(blobData); + // Footer JSON. + fileContent.append(footerJson); + // Footer payload size. + uint32_t littleEndianSize = folly::Endian::little(footerPayloadSize); + fileContent.append( + reinterpret_cast(&littleEndianSize), + sizeof(littleEndianSize)); + // Flags. + uint32_t littleEndianFlags = folly::Endian::little(kPuffinFooterFlags); + fileContent.append( + reinterpret_cast(&littleEndianFlags), + sizeof(littleEndianFlags)); + // Footer magic. + fileContent.append(kPuffinMagic, kPuffinMagicSize); + + // Write to file. + std::ofstream out(filePath, std::ios::binary | std::ios::trunc); + VELOX_CHECK( + out.good(), "Failed to open Puffin file for writing: {}", filePath); + out.write( + fileContent.data(), static_cast(fileContent.size())); + out.close(); + VELOX_CHECK(!out.fail(), "Failed to write Puffin file: {}", filePath); + + return {blobOffset, blobLength}; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/DeletionVectorWriter.h b/velox/connectors/hive/iceberg/DeletionVectorWriter.h new file mode 100644 index 000000000000..9496a826182f --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorWriter.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +/// Writes Iceberg V3 deletion vectors as serialized roaring bitmaps. +/// +/// Iceberg V3 uses deletion vectors (DVs) — compact roaring bitmaps stored +/// as blobs inside Puffin files — to mark deleted rows in data files. This +/// writer collects deleted row positions and serializes them in the standard +/// roaring bitmap portable format for consumption by DeletionVectorReader. +/// +/// Usage: +/// DeletionVectorWriter writer; +/// writer.addDeletedPosition(5); +/// writer.addDeletedPosition(10); +/// std::string blob = writer.serialize(); +/// // Write blob to a Puffin file at the appropriate offset. +class DeletionVectorWriter { + public: + DeletionVectorWriter() = default; + + /// Adds a deleted row position (0-based file row offset). + void addDeletedPosition(int64_t position); + + /// Adds multiple deleted row positions. + void addDeletedPositions(const std::vector& positions); + + /// Returns the number of deleted positions collected so far. + size_t numPositions() const { + return positions_.size(); + } + + /// Serializes the collected positions into a roaring bitmap in the + /// portable binary format (SERIAL_COOKIE_NO_RUNCONTAINER, cookie = 12346). + /// + /// Uses array containers for cardinality <= 4096 and bitmap containers + /// for cardinality > 4096, matching the standard roaring bitmap spec. + /// + /// @return Binary string containing the serialized roaring bitmap. + std::string serialize() const; + + /// Clears all collected positions. + void clear(); + + private: + std::vector positions_; +}; + +/// Writes a Puffin file containing a single deletion vector blob. +/// +/// The Puffin file format consists of: +/// - 4-byte magic: "PUF1" +/// - Blob data (the serialized roaring bitmap) +/// - Footer: blob metadata + footer payload size + magic +/// +/// This is a simplified writer that produces files compatible with the +/// Iceberg Puffin spec for single-blob deletion vectors. +/// +/// @param filePath Path to write the Puffin file. +/// @param blobData Serialized roaring bitmap bytes. +/// @param referencedDataFile Path of the data file this DV applies to. +/// @return Pair of (blobOffset, blobLength) within the written file. +std::pair writePuffinFile( + const std::string& filePath, + const std::string& blobData, + const std::string& referencedDataFile); + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index 08a509442fa3..ef02db761cb2 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -120,4 +120,17 @@ if(NOT VELOX_DISABLE_GOOGLETEST) GTest::gtest GTest::gtest_main ) + + add_executable(velox_hive_iceberg_deletion_vector_writer_test DeletionVectorWriterTest.cpp) + add_test(velox_hive_iceberg_deletion_vector_writer_test velox_hive_iceberg_deletion_vector_writer_test) + + target_link_libraries( + velox_hive_iceberg_deletion_vector_writer_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) endif() diff --git a/velox/connectors/hive/iceberg/tests/DeletionVectorWriterTest.cpp b/velox/connectors/hive/iceberg/tests/DeletionVectorWriterTest.cpp new file mode 100644 index 000000000000..b1f8ca73830f --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/DeletionVectorWriterTest.cpp @@ -0,0 +1,290 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorWriter.h" + +#include + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/testutil/TempFilePath.h" +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive::iceberg; +using namespace facebook::velox::common::testutil; + +namespace { + +/// Extracts which bits are set in a bitmap buffer. +std::vector getSetBits(const BufferPtr& bitmap, uint64_t size) { + auto* raw = bitmap->as(); + std::vector result; + for (uint64_t i = 0; i < size; ++i) { + if (bits::isBitSet(raw, i)) { + result.push_back(i); + } + } + return result; +} + +} // namespace + +class DeletionVectorWriterTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + } + + void SetUp() override { + filesystems::registerLocalFileSystem(); + pool_ = memory::memoryManager()->addLeafPool("DeletionVectorWriterTest"); + } + + BufferPtr allocateBitmap(uint64_t numBits) { + auto numBytes = bits::nbytes(numBits); + return AlignedBuffer::allocate(numBytes, pool_.get(), 0); + } + + /// Writes serialized bitmap to a temp file, reads it back with + /// DeletionVectorReader, and verifies the positions match. + void verifyRoundTrip( + const std::vector& positions, + uint64_t batchSize) { + DeletionVectorWriter writer; + writer.addDeletedPositions(positions); + EXPECT_EQ(writer.numPositions(), positions.size()); + + auto blobData = writer.serialize(); + + auto tempFile = TempFilePath::create(); + { + std::ofstream out( + tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write(blobData.data(), static_cast(blobData.size())); + } + + auto fileSize = static_cast(blobData.size()); + + // Create IcebergDeleteFile with DV metadata. + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = "0"; + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(fileSize); + + IcebergDeleteFile dvFile( + FileContent::kDeletionVector, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + positions.size(), + fileSize, + {}, + lowerBounds, + upperBounds); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Collect all set bits across batches. + std::vector allSetBits; + int64_t maxPos = positions.empty() + ? 0 + : *std::max_element(positions.begin(), positions.end()); + uint64_t totalRows = static_cast(maxPos) + batchSize; + + for (uint64_t offset = 0; offset < totalRows; offset += batchSize) { + auto bitmap = allocateBitmap(batchSize); + reader.readDeletePositions(offset, batchSize, bitmap); + auto bits = getSetBits(bitmap, batchSize); + for (auto b : bits) { + allSetBits.push_back(offset + b); + } + } + + // Sort and deduplicate the expected positions. + std::vector expected = positions; + std::sort(expected.begin(), expected.end()); + expected.erase( + std::unique(expected.begin(), expected.end()), expected.end()); + + EXPECT_EQ(allSetBits.size(), expected.size()); + for (size_t i = 0; i < expected.size(); ++i) { + EXPECT_EQ(allSetBits[i], static_cast(expected[i])); + } + } + + std::shared_ptr pool_; +}; + +TEST_F(DeletionVectorWriterTest, emptyBitmap) { + DeletionVectorWriter writer; + EXPECT_EQ(writer.numPositions(), 0); + + auto data = writer.serialize(); + // Empty bitmap: cookie (4 bytes) + container count 0 (4 bytes). + EXPECT_EQ(data.size(), 8); +} + +TEST_F(DeletionVectorWriterTest, singlePosition) { + verifyRoundTrip({42}, 100); +} + +TEST_F(DeletionVectorWriterTest, multiplePositions) { + verifyRoundTrip({0, 5, 10, 99}, 100); +} + +TEST_F(DeletionVectorWriterTest, consecutivePositions) { + std::vector positions; + positions.reserve(100); + for (int64_t i = 0; i < 100; ++i) { + positions.push_back(i); + } + verifyRoundTrip(positions, 200); +} + +TEST_F(DeletionVectorWriterTest, multipleContainers) { + // Positions spanning two containers (key=0 and key=1). + verifyRoundTrip({5, 100, 65536, 65600}, 70000); +} + +TEST_F(DeletionVectorWriterTest, largeCardinalityBitmapContainer) { + // More than 4096 positions in a single container triggers bitmap container. + std::vector positions; + positions.reserve(5000); + for (int64_t i = 0; i < 5000; ++i) { + positions.push_back(i * 2); // Even numbers 0..9998. + } + verifyRoundTrip(positions, 10100); +} + +TEST_F(DeletionVectorWriterTest, duplicatePositions) { + // Writer should deduplicate. + DeletionVectorWriter writer; + writer.addDeletedPosition(5); + writer.addDeletedPosition(5); + writer.addDeletedPosition(10); + writer.addDeletedPosition(10); + writer.addDeletedPosition(10); + EXPECT_EQ(writer.numPositions(), 5); + + auto data = writer.serialize(); + + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write(data.data(), static_cast(data.size())); + } + + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = "0"; + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(data.size()); + + IcebergDeleteFile dvFile( + FileContent::kDeletionVector, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, // Only 2 unique positions. + data.size(), + {}, + lowerBounds, + upperBounds); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{5, 10})); +} + +TEST_F(DeletionVectorWriterTest, clearPositions) { + DeletionVectorWriter writer; + writer.addDeletedPosition(1); + writer.addDeletedPosition(2); + EXPECT_EQ(writer.numPositions(), 2); + + writer.clear(); + EXPECT_EQ(writer.numPositions(), 0); + + auto data = writer.serialize(); + EXPECT_EQ(data.size(), 8); // Empty bitmap. +} + +TEST_F(DeletionVectorWriterTest, negativePositionRejected) { + DeletionVectorWriter writer; + VELOX_ASSERT_THROW( + writer.addDeletedPosition(-1), "Deleted position must be non-negative"); +} + +TEST_F(DeletionVectorWriterTest, fourOrMoreContainersWithOffsets) { + // With >= 4 containers, the roaring format includes an offset section. + std::vector positions; + positions.reserve(5); + for (int i = 0; i < 5; ++i) { + positions.push_back(static_cast(i) * 65536 + 42); + } + verifyRoundTrip(positions, 5 * 65536 + 100); +} + +TEST_F(DeletionVectorWriterTest, puffinFileRoundTrip) { + DeletionVectorWriter writer; + writer.addDeletedPositions({3, 7, 42, 100}); + auto blobData = writer.serialize(); + + auto tempFile = TempFilePath::create(); + auto [blobOffset, blobLength] = writePuffinFile( + tempFile->getPath(), blobData, "/data/test-data-file.parquet"); + + EXPECT_EQ(blobOffset, 4); // After "PUF1" magic. + EXPECT_EQ(blobLength, blobData.size()); + + // Read the blob back from the Puffin file using DeletionVectorReader. + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = + std::to_string(blobOffset); + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(blobLength); + + // Get full file size. + std::ifstream in(tempFile->getPath(), std::ios::binary | std::ios::ate); + auto fileSize = static_cast(in.tellg()); + + IcebergDeleteFile dvFile( + FileContent::kDeletionVector, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 4, + fileSize, + {}, + lowerBounds, + upperBounds); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(200); + reader.readDeletePositions(0, 200, bitmap); + + auto setBits = getSetBits(bitmap, 200); + EXPECT_EQ(setBits, (std::vector{3, 7, 42, 100})); +} From fdfb9fee3743ce7db27aab7dd6ebf90510cc0e83 Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 23:34:44 -0700 Subject: [PATCH 7/7] feat: [velox][iceberg] Add DWRF file format support for Iceberg data sink Summary: - Add DWRF file format support in IcebergDataSink for read and write paths - Update BUCK build targets for DWRF dependencies - Add IcebergDwrfInsertTest with comprehensive insert/read tests for DWRF format - Update CMakeLists.txt for new test targets Differential Revision: D97530138 --- .../hive/iceberg/IcebergDataSink.cpp | 106 +++++---- .../connectors/hive/iceberg/IcebergDataSink.h | 2 +- .../hive/iceberg/tests/CMakeLists.txt | 20 ++ .../iceberg/tests/IcebergDwrfInsertTest.cpp | 208 ++++++++++++++++++ 4 files changed, 296 insertions(+), 40 deletions(-) create mode 100644 velox/connectors/hive/iceberg/tests/IcebergDwrfInsertTest.cpp diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.cpp b/velox/connectors/hive/iceberg/IcebergDataSink.cpp index 9b8528185224..6655a1eb1560 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.cpp +++ b/velox/connectors/hive/iceberg/IcebergDataSink.cpp @@ -142,10 +142,11 @@ IcebergInsertTableHandle::IcebergInsertTableHandle( "Input columns cannot be empty for Iceberg tables."); VELOX_USER_CHECK_NOT_NULL( locationHandle_, "Location handle is required for Iceberg tables."); - VELOX_USER_CHECK_EQ( - tableStorageFormat, - dwio::common::FileFormat::PARQUET, - "Only Parquet file format is supported when writing Iceberg tables."); + VELOX_USER_CHECK( + tableStorageFormat == dwio::common::FileFormat::PARQUET || + tableStorageFormat == dwio::common::FileFormat::DWRF, + "Unsupported file format for writing Iceberg tables: {}", + dwio::common::toString(tableStorageFormat)); } namespace { @@ -316,14 +317,16 @@ IcebergDataSink::IcebergDataSink( commitPartitionValue_.resize(maxOpenWriters_); #ifdef VELOX_ENABLE_PARQUET - std::vector columnHandles; - columnHandles.reserve(insertTableHandle->inputColumns().size()); - for (auto& column : insertTableHandle->inputColumns()) { - columnHandles.emplace_back( - checkedPointerCast(column)); + if (insertTableHandle->storageFormat() == dwio::common::FileFormat::PARQUET) { + std::vector columnHandles; + columnHandles.reserve(insertTableHandle->inputColumns().size()); + for (auto& column : insertTableHandle->inputColumns()) { + columnHandles.emplace_back( + checkedPointerCast(column)); + } + parquetStatsCollector_ = std::make_shared( + std::move(columnHandles)); } - parquetStatsCollector_ = - std::make_shared(std::move(columnHandles)); #endif } @@ -352,7 +355,11 @@ std::vector IcebergDataSink::commitMessage() const { icebergInsertTableHandle_->partitionSpec()->specId : 0) // Sort order evolution is not supported. Set default id to 0 ( unsorted order). ("sortOrderId", 0) - ("fileFormat", "PARQUET") + ("fileFormat", + icebergInsertTableHandle_->storageFormat() == + dwio::common::FileFormat::PARQUET + ? "PARQUET" + : "DWRF") ("content", "DATA"); // clang-format on if (!commitPartitionValue_.empty() && @@ -405,26 +412,35 @@ uint32_t IcebergDataSink::ensureWriter(const HiveWriterId& id) { std::shared_ptr IcebergDataSink::createWriterOptions(size_t writerIndex) const { auto options = HiveDataSink::createWriterOptions(writerIndex); - // Per Iceberg specification (https://iceberg.apache.org/spec/#parquet): - // - Timestamps must be stored with microsecond precision. - // - Timestamps must NOT be adjusted to UTC timezone; they should be written - // as-is without timezone conversion (empty string disables conversion). - // - // These settings are passed via serdeParameters to avoid including - // parquet-specific headers. The keys must match kParquetSerdeTimestampUnit - // and kParquetSerdeTimestampTimezone defined in - // velox/dwio/parquet/writer/Writer.h. The value "6" represents microseconds - // (TimestampPrecision::kMicroseconds). - options->serdeParameters["parquet.writer.timestamp.unit"] = "6"; - options->serdeParameters["parquet.writer.timestamp.timezone"] = ""; + if (icebergInsertTableHandle_->storageFormat() == + dwio::common::FileFormat::PARQUET) { + // Per Iceberg specification (https://iceberg.apache.org/spec/#parquet): + // - Timestamps must be stored with microsecond precision. + // - Timestamps must NOT be adjusted to UTC timezone; they should be written + // as-is without timezone conversion (empty string disables conversion). + // + // These settings are Parquet-specific and must NOT be applied for DWRF: + // 1. The serde parameter keys are only recognized by the Parquet writer. + // 2. The checkedPointerCast below would crash if + // the options are actually dwrf::WriterOptions. + // 3. Parquet field IDs (used for Iceberg schema evolution) are a Parquet + // concept — DWRF uses column indices instead. + // + // The keys must match kParquetSerdeTimestampUnit and + // kParquetSerdeTimestampTimezone defined in + // velox/dwio/parquet/writer/Writer.h. The value "6" represents microseconds + // (TimestampPrecision::kMicroseconds). + options->serdeParameters["parquet.writer.timestamp.unit"] = "6"; + options->serdeParameters["parquet.writer.timestamp.timezone"] = ""; #ifdef VELOX_ENABLE_PARQUET - if (parquetStatsCollector_) { - auto parquetOptions = checkedPointerCast(options); - parquetOptions->parquetFieldIds = - parquetStatsCollector_->parquetFieldIds().children; - } + if (parquetStatsCollector_) { + auto parquetOptions = checkedPointerCast(options); + parquetOptions->parquetFieldIds = + parquetStatsCollector_->parquetFieldIds().children; + } #endif + } // Re-process configs to apply the serde parameters we just set. options->processConfigs( @@ -466,19 +482,25 @@ void IcebergDataSink::rotateWriter(size_t index) { memory::NonReclaimableSectionGuard nonReclaimableGuard( writerInfo_[index]->nonReclaimableSectionHolder.get()); auto metadata = writers_[index]->close(); -#ifdef VELOX_ENABLE_PARQUET bool fileAdded = getCurrentFileBytes(index) > 0; -#endif // Finalize file info (capture file size, add to writtenFiles). finalizeWriterFile(index); -#ifdef VELOX_ENABLE_PARQUET if (fileAdded) { +#ifdef VELOX_ENABLE_PARQUET + if (parquetStatsCollector_) { + dataFileStats_[index].emplace_back( + parquetStatsCollector_->aggregate(std::move(metadata))); + } else { + dataFileStats_[index].emplace_back( + std::make_shared()); + } +#else dataFileStats_[index].emplace_back( - parquetStatsCollector_->aggregate(std::move(metadata))); - } + std::make_shared()); #endif + } } // Release old writer. The new writer will be created lazily on the next @@ -506,18 +528,24 @@ void IcebergDataSink::closeInternal() { writerInfo_[i]->nonReclaimableSectionHolder.get()); auto metadata = writers_[i]->close(); -#ifdef VELOX_ENABLE_PARQUET bool fileAdded = getCurrentFileBytes(i) > 0; -#endif finalizeWriterFile(i); -#ifdef VELOX_ENABLE_PARQUET if (fileAdded) { +#ifdef VELOX_ENABLE_PARQUET + if (parquetStatsCollector_) { + dataFileStats_[i].emplace_back( + parquetStatsCollector_->aggregate(std::move(metadata))); + } else { + dataFileStats_[i].emplace_back( + std::make_shared()); + } +#else dataFileStats_[i].emplace_back( - parquetStatsCollector_->aggregate(std::move(metadata))); - } + std::make_shared()); #endif + } } } else { for (auto i = 0; i < writers_.size(); ++i) { diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.h b/velox/connectors/hive/iceberg/IcebergDataSink.h index 0d94ef420f8b..055e0d7bbf40 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.h +++ b/velox/connectors/hive/iceberg/IcebergDataSink.h @@ -97,7 +97,7 @@ class IcebergDataSink : public HiveDataSink { /// - fileSizeInBytes: raw bytes written to disk. /// - metrics: object with recordCount (number of rows written). /// - partitionSpecJson: partition specification. - /// - fileFormat: storage format (e.g., "PARQUET"). + /// - fileFormat: storage format (e.g., "PARQUET", "DWRF"). /// - content: file content type ("DATA" for data files). /// /// See diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index ef02db761cb2..0009de0d3914 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -133,4 +133,24 @@ if(NOT VELOX_DISABLE_GOOGLETEST) GTest::gtest GTest::gtest_main ) + + add_executable( + velox_hive_iceberg_dwrf_insert_test + IcebergDwrfInsertTest.cpp + IcebergTestBase.cpp + Main.cpp + ) + add_test(velox_hive_iceberg_dwrf_insert_test velox_hive_iceberg_dwrf_insert_test) + + target_link_libraries( + velox_hive_iceberg_dwrf_insert_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + GTest::gtest + GTest::gtest_main + ) endif() diff --git a/velox/connectors/hive/iceberg/tests/IcebergDwrfInsertTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergDwrfInsertTest.cpp new file mode 100644 index 000000000000..603110352b64 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergDwrfInsertTest.cpp @@ -0,0 +1,208 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" +#include "velox/connectors/hive/iceberg/DeletionVectorWriter.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h" +#include "velox/dwio/dwrf/RegisterDwrfReader.h" +#include "velox/dwio/dwrf/RegisterDwrfWriter.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +using namespace facebook::velox::common::testutil; + +namespace facebook::velox::connector::hive::iceberg { +namespace { + +/// End-to-end tests for writing and reading Iceberg tables using the DWRF file +/// format. Exercises the full write path (IcebergDataSink -> DWRF writer) and +/// the full read path (IcebergSplitReader -> DWRF reader), verifying data +/// round-trip correctness. +class IcebergDwrfInsertTest : public test::IcebergTestBase { + protected: + void SetUp() override { + IcebergTestBase::SetUp(); + dwrf::registerDwrfReaderFactory(); + dwrf::registerDwrfWriterFactory(); + fileFormat_ = dwio::common::FileFormat::DWRF; + } + + /// Write test data using DWRF format, then read it back and verify results. + void test(const RowTypePtr& rowType, double nullRatio = 0.0) { + const auto outputDirectory = TempDirectoryPath::create(); + const auto dataPath = outputDirectory->getPath(); + constexpr int32_t numBatches = 10; + constexpr int32_t vectorSize = 5'000; + const auto vectors = + createTestData(rowType, numBatches, vectorSize, nullRatio); + const auto dataSink = createDataSinkAndAppendData(vectors, dataPath); + const auto commitTasks = dataSink->close(); + + auto splits = createSplitsForDirectory(dataPath); + ASSERT_EQ(splits.size(), commitTasks.size()); + auto plan = exec::test::PlanBuilder() + .startTableScan() + .connectorId(test::kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + exec::test::AssertQueryBuilder(plan).splits(splits).assertResults(vectors); + } +}; + +TEST_F(IcebergDwrfInsertTest, basic) { + auto rowType = + ROW({"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}, + {BIGINT(), + INTEGER(), + SMALLINT(), + BOOLEAN(), + REAL(), + VARCHAR(), + VARBINARY(), + DOUBLE()}); + test(rowType, 0.2); +} + +TEST_F(IcebergDwrfInsertTest, mapAndArray) { + auto rowType = + ROW({"c1", "c2"}, {MAP(INTEGER(), VARCHAR()), ARRAY(VARCHAR())}); + test(rowType); +} + +/// Verify the commit message contains "DWRF" as the file format. +TEST_F(IcebergDwrfInsertTest, commitMessageFormat) { + const auto outputDirectory = TempDirectoryPath::create(); + const auto dataPath = outputDirectory->getPath(); + auto rowType = ROW({"c1", "c2"}, {BIGINT(), VARCHAR()}); + const auto vectors = createTestData(rowType, 2, 100); + const auto dataSink = createDataSinkAndAppendData(vectors, dataPath); + const auto commitTasks = dataSink->close(); + + ASSERT_GT(commitTasks.size(), 0); + for (const auto& task : commitTasks) { + auto taskJson = folly::parseJson(task); + ASSERT_TRUE(taskJson.count("fileFormat") > 0); + ASSERT_EQ(taskJson["fileFormat"].asString(), "DWRF"); + } +} + +/// Integration test: write a DWRF data file, create a deletion vector (DV) +/// that marks specific rows as deleted, attach the DV to the split, read back, +/// and verify only the non-deleted rows are returned. This exercises the full +/// DWRF write path -> DV creation -> DWRF read path with DV filtering pipeline. +TEST_F(IcebergDwrfInsertTest, deletionVectors) { + const auto outputDirectory = TempDirectoryPath::create(); + const auto dataPath = outputDirectory->getPath(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + // Write 100 deterministic rows: {0, 1, 2, ..., 99}. + constexpr int32_t numRows = 100; + auto data = makeRowVector( + {"c0"}, {makeFlatVector(numRows, [](auto row) { return row; })}); + + auto dataSink = createDataSink(rowType, dataPath); + dataSink->appendData(data); + ASSERT_TRUE(dataSink->finish()); + auto commitTasks = dataSink->close(); + ASSERT_EQ(commitTasks.size(), 1); + + // Create a deletion vector deleting every 10th row: {0, 10, 20, ..., 90}. + std::vector deletePositions; + for (int64_t i = 0; i < numRows; i += 10) { + deletePositions.push_back(i); + } + + DeletionVectorWriter dvWriter; + dvWriter.addDeletedPositions(deletePositions); + auto dvBlob = dvWriter.serialize(); + + // Write the DV blob to a temp file. + const auto dvDirectory = TempDirectoryPath::create(); + const auto dvFilePath = dvDirectory->getPath() + "/dv.bin"; + { + std::ofstream out(dvFilePath, std::ios::binary | std::ios::trunc); + out.write(dvBlob.data(), static_cast(dvBlob.size())); + } + auto dvFileSize = static_cast(dvBlob.size()); + + // Build IcebergDeleteFile metadata with DV blob offset and length. + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = "0"; + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(dvFileSize); + + IcebergDeleteFile dvDeleteFile( + FileContent::kDeletionVector, + dvFilePath, + dwio::common::FileFormat::DWRF, + deletePositions.size(), + dvFileSize, + {}, + lowerBounds, + upperBounds); + + // Create a split for the DWRF data file with the DV attached. + auto dataFiles = listFiles(dataPath); + ASSERT_EQ(dataFiles.size(), 1); + const auto& dataFilePath = dataFiles[0]; + auto fileHandle = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + + std::vector> splits; + splits.push_back( + std::make_shared( + test::kIcebergConnectorId, + dataFilePath, + fileFormat_, + 0, + fileHandle->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + std::vector{dvDeleteFile})); + + // Build expected result: all rows NOT in deletePositions. + std::unordered_set deletedSet( + deletePositions.begin(), deletePositions.end()); + std::vector expectedValues; + for (int64_t i = 0; i < numRows; ++i) { + if (!deletedSet.contains(i)) { + expectedValues.push_back(i); + } + } + auto expected = + makeRowVector({"c0"}, {makeFlatVector(expectedValues)}); + + auto plan = exec::test::PlanBuilder() + .startTableScan() + .connectorId(test::kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + exec::test::AssertQueryBuilder(plan).splits(splits).assertResults(expected); +} + +} // namespace +} // namespace facebook::velox::connector::hive::iceberg