From 2f06a3b0689a6856e53695b2f49596af2ca550de Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 23:31:19 -0700 Subject: [PATCH 1/4] feat:[velox][iceberg] Improve IcebergSplitReader error handling and fix test file handle leaks (#16869) Summary: - Add explicit equality deletes NYI branch in prepareSplit() - Improve VELOX_NYI error messages with descriptive content type info - Fix FILE handle leaks in IcebergReadTest by extracting getTestFileSize() helper - Minor doc comment formatting improvements in IcebergSplitReader.h Differential Revision: D97530140 --- .../hive/iceberg/IcebergSplitReader.cpp | 7 ++++++- .../hive/iceberg/IcebergSplitReader.h | 6 +++--- .../hive/iceberg/tests/IcebergReadTest.cpp | 19 +++++++++++++------ 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index a8821a75204f..39a6864cdd77 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -18,6 +18,7 @@ #include +#include "velox/common/base/Exceptions.h" #include "velox/common/encode/Base64.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" @@ -115,8 +116,12 @@ void IcebergSplitReader::prepareSplit( splitOffset_, hiveSplit_->connectorId)); } + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + VELOX_NYI("Equality deletes are not yet supported."); } else { - VELOX_NYI(); + VELOX_NYI( + "Unsupported delete file content type: {}", + static_cast(deleteFile.content)); } } } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index f727f5c86ff2..0c661acd3598 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -93,10 +93,10 @@ class IcebergSplitReader : public SplitReader { const RowTypePtr& fileType, const RowTypePtr& tableSchema) const override; - // The read offset to the beginning of the split in number of rows for the - // current batch for the base data file + /// Read offset to the beginning of the split in number of rows for the + /// current batch for the base data file. uint64_t baseReadOffset_; - // The file position for the first row in the split + /// File position for the first row in the split. uint64_t splitOffset_; std::list> positionalDeleteFileReaders_; diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 29acc03a2e10..113ffff8d052 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -42,6 +42,15 @@ using namespace facebook::velox::common::testutil; namespace facebook::velox::connector::hive::iceberg { +namespace { +// Return the file size for the given path using Velox's filesystem API. +uint64_t getTestFileSize(const std::string& path) { + return filesystems::getFileSystem(path, nullptr) + ->openFileForRead(path) + ->size(); +} +} // namespace + static const char* kIcebergConnectorId = "test-iceberg"; class HiveIcebergTest : public HiveConnectorTestBase { @@ -241,8 +250,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFilePath, fileFomat_, deleteFilePaths[deleteFileName].first, - testing::internal::GetFileSize( - std::fopen(deleteFilePath.c_str(), "r"))); + getTestFileSize(deleteFilePath)); deleteFiles.push_back(icebergDeleteFile); } } @@ -354,8 +362,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFilePath->getPath(), fileFomat_, deletedPositionSize, - testing::internal::GetFileSize( - std::fopen(deleteFilePath->getPath().c_str(), "r"))); + getTestFileSize(deleteFilePath->getPath())); auto fileSize = filesystems::getFileSystem(path, nullptr) ->openFileForRead(path) ->size(); @@ -934,8 +941,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { deleteFilePath->getPath(), dwio::common::FileFormat::DWRF, 3, - testing::internal::GetFileSize( - std::fopen(deleteFilePath->getPath().c_str(), "r")), + getTestFileSize(deleteFilePath->getPath()), {}, {}, {{posColumn->id, encodedUpperBound}}); @@ -1004,4 +1010,5 @@ TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) { 0); } #endif + } // namespace facebook::velox::connector::hive::iceberg From 34fbaf6d08e759c6644bd9865655a634e22d0745 Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 23:31:19 -0700 Subject: [PATCH 2/4] feat: [velox][iceberg] Add Iceberg V3 deletion vector support (#16870) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Add deletion vector (DV) reader to the Velox Iceberg connector, enabling Iceberg V3 spec support for row-level deletes. Iceberg V3 replaces positional delete files with deletion vectors — compact roaring bitmaps stored as blobs inside Puffin files. Compared to V2 positional delete files, DVs are more compact and avoid sorted merge of multiple delete files at read time. Changes: - IcebergDeleteFile.h: Add FileContent::kDeletionVector enum value - DeletionVectorReader.h/cpp: New reader that loads a Puffin blob, deserializes the roaring bitmap portable format (array, bitset, and run containers), and sets bits in the deleteBitmap for the current batch range. No CRoaring dependency — self-contained parser. - IcebergSplitReader.h: Add deletionVectorReaders_ member, include header - IcebergSplitReader.cpp: Route kDeletionVector in prepareSplit(), apply DVs alongside positional deletes in next() - BUCK: Add DeletionVectorReader library target Differential Revision: D97530142 --- velox/connectors/hive/iceberg/CMakeLists.txt | 1 + .../hive/iceberg/DeletionVectorReader.cpp | 334 +++++++++++ .../hive/iceberg/DeletionVectorReader.h | 110 ++++ .../hive/iceberg/IcebergDeleteFile.h | 6 + .../hive/iceberg/IcebergSplitReader.cpp | 21 +- .../hive/iceberg/IcebergSplitReader.h | 4 + .../hive/iceberg/tests/CMakeLists.txt | 13 + .../tests/DeletionVectorReaderTest.cpp | 560 ++++++++++++++++++ 8 files changed, 1048 insertions(+), 1 deletion(-) create mode 100644 velox/connectors/hive/iceberg/DeletionVectorReader.cpp create mode 100644 velox/connectors/hive/iceberg/DeletionVectorReader.h create mode 100644 velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index 6bb02cb38478..d462e891fc3f 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -14,6 +14,7 @@ set( ICEBERG_SOURCES + DeletionVectorReader.cpp IcebergConfig.cpp IcebergColumnHandle.cpp IcebergConnector.cpp diff --git a/velox/connectors/hive/iceberg/DeletionVectorReader.cpp b/velox/connectors/hive/iceberg/DeletionVectorReader.cpp new file mode 100644 index 000000000000..5e44bec2066d --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorReader.cpp @@ -0,0 +1,334 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::connector::hive::iceberg { + +DeletionVectorReader::DeletionVectorReader( + const IcebergDeleteFile& dvFile, + uint64_t splitOffset, + memory::MemoryPool* /*pool*/) + : dvFile_(dvFile), splitOffset_(splitOffset) { + VELOX_CHECK( + dvFile_.content == FileContent::kDeletionVector, + "Expected deletion vector file but got content type: {}", + static_cast(dvFile_.content)); + VELOX_CHECK_GT(dvFile_.recordCount, 0, "Empty deletion vector."); + + // Sanity-check the record count to catch corrupt metadata before we + // allocate a potentially enormous bitmap. 10 billion rows is well + // beyond any realistic single-file cardinality. + static constexpr int64_t kMaxDeletionVectorRecordCount = 10'000'000'000LL; + VELOX_CHECK_LE( + dvFile_.recordCount, + kMaxDeletionVectorRecordCount, + "Deletion vector record count exceeds sanity limit: {}", + dvFile_.recordCount); +} + +void DeletionVectorReader::loadBitmap() { + if (loaded_) { + return; + } + loaded_ = true; + + // Read the raw DV blob from the file. The blob offset and length are + // encoded in the IcebergDeleteFile bounds maps by the coordinator. + uint64_t blobOffset = 0; + uint64_t blobLength = dvFile_.fileSizeInBytes; + + if (auto it = dvFile_.lowerBounds.find(kDvOffsetFieldId); + it != dvFile_.lowerBounds.end()) { + try { + blobOffset = std::stoull(it->second); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to parse DV blob offset from bounds map: {}", e.what()); + } + } + if (auto it = dvFile_.upperBounds.find(kDvLengthFieldId); + it != dvFile_.upperBounds.end()) { + try { + blobLength = std::stoull(it->second); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to parse DV blob length from bounds map: {}", e.what()); + } + } + + auto fs = filesystems::getFileSystem(dvFile_.filePath, nullptr); + auto readFile = fs->openFileForRead(dvFile_.filePath); + + auto fileSize = readFile->size(); + VELOX_CHECK_LE( + blobOffset, + fileSize, + "DV blob offset {} exceeds file size {}.", + blobOffset, + fileSize); + VELOX_CHECK_LE( + blobLength, + fileSize - blobOffset, + "DV blob range [{}, {}) exceeds file size {}.", + blobOffset, + blobOffset + blobLength, + fileSize); + + // Read the blob bytes. + std::string blobData(blobLength, '\0'); + readFile->pread(blobOffset, blobLength, blobData.data()); + + // Deserialize the roaring bitmap from the portable binary format. + // The Iceberg V3 spec uses the standard RoaringBitmap portable + // serialization (https://roaringbitmap.org/). We parse the bitmap + // directly without depending on CRoaring — the format is well-defined: + // + // For small deletion vectors, the coordinator may provide the deleted + // positions directly as a sorted list encoded in the blob. For the + // general case, we parse the roaring bitmap portable format. + // + // Portable format layout: + // - cookie: uint32 (identifies format version) + // - container count: uint32 + // - per container: key (uint16) + cardinality-1 (uint16) + // - per container: offset (uint32) [if >4 containers] + // - container data: array or bitset containers + // + // We use a simplified parser that extracts all set positions. + deserializeRoaringBitmap(blobData); + + // Sort positions for efficient batch-range scanning. + std::sort(deletedPositions_.begin(), deletedPositions_.end()); +} + +void DeletionVectorReader::deserializeRoaringBitmap(const std::string& data) { + if (data.size() < 8) { + VELOX_FAIL( + "Deletion vector blob too small: {} bytes, expected at least 8.", + data.size()); + } + + const uint8_t* ptr = reinterpret_cast(data.data()); + const uint8_t* end = ptr + data.size(); + + // Read cookie (first 4 bytes). The portable format has two variants: + // - SERIAL_COOKIE_NO_RUNCONTAINER (12346): standard format + // - SERIAL_COOKIE (12347): format with run containers + uint32_t cookie; + std::memcpy(&cookie, ptr, sizeof(uint32_t)); + cookie = folly::Endian::little(cookie); + ptr += sizeof(uint32_t); + + static constexpr uint32_t kSerialCookieNoRun = 12346; + static constexpr uint32_t kSerialCookie = 12347; + + bool hasRunContainers = false; + uint32_t numContainers = 0; + + if ((cookie & 0xFFFF) == kSerialCookie) { + hasRunContainers = true; + numContainers = (cookie >> 16) + 1; + } else if (cookie == kSerialCookieNoRun) { + std::memcpy(&numContainers, ptr, sizeof(uint32_t)); + numContainers = folly::Endian::little(numContainers); + ptr += sizeof(uint32_t); + } else { + VELOX_FAIL( + "Unknown roaring bitmap cookie: {}. Expected {} or {}.", + cookie, + kSerialCookieNoRun, + kSerialCookie); + } + + if (numContainers == 0) { + return; + } + + // Read run bitmap if present (ceil(numContainers / 8) bytes). + std::vector isRunContainer(numContainers, false); + if (hasRunContainers) { + uint32_t runBitmapBytes = (numContainers + 7) / 8; + VELOX_CHECK_GE( + static_cast(end - ptr), + runBitmapBytes, + "Truncated run bitmap."); + for (uint32_t i = 0; i < numContainers; ++i) { + isRunContainer[i] = (ptr[i / 8] >> (i % 8)) & 1; + } + ptr += runBitmapBytes; + } + + // Read key-cardinality pairs: (uint16 key, uint16 cardinality-1) per + // container. + struct ContainerMeta { + uint16_t key; + uint32_t cardinality; + }; + std::vector containers(numContainers); + + VELOX_CHECK_GE( + static_cast(end - ptr), + numContainers * 4, + "Truncated container metadata."); + for (uint32_t i = 0; i < numContainers; ++i) { + uint16_t key, cardMinus1; + std::memcpy(&key, ptr, sizeof(uint16_t)); + key = folly::Endian::little(key); + ptr += sizeof(uint16_t); + std::memcpy(&cardMinus1, ptr, sizeof(uint16_t)); + cardMinus1 = folly::Endian::little(cardMinus1); + ptr += sizeof(uint16_t); + containers[i] = {key, static_cast(cardMinus1) + 1}; + } + + // Skip offset section when there are >= 4 containers. Per the roaring + // bitmap portable format spec, the offset section is present for both + // SERIAL_COOKIE_NO_RUNCONTAINER and SERIAL_COOKIE formats when + // numContainers >= NO_OFFSET_THRESHOLD (4). + if (numContainers >= 4) { + // Offsets: uint32 per container. + VELOX_CHECK_GE( + static_cast(end - ptr), + numContainers * 4, + "Truncated offset section."); + ptr += numContainers * sizeof(uint32_t); + } + + // Read container data. + // Guard against unreasonable recordCount that could cause excessive + // allocation. + static constexpr int64_t kMaxDeletionVectorPositions = 1LL + << 30; // ~1 billion + VELOX_CHECK_LE( + dvFile_.recordCount, + kMaxDeletionVectorPositions, + "Deletion vector recordCount exceeds maximum: {}", + dvFile_.recordCount); + deletedPositions_.reserve(dvFile_.recordCount); + + for (uint32_t i = 0; i < numContainers; ++i) { + uint32_t highBits = static_cast(containers[i].key) << 16; + uint32_t cardinality = containers[i].cardinality; + + if (isRunContainer[i]) { + // Run container: pairs of (start, length-1). + uint16_t numRuns; + VELOX_CHECK_GE( + static_cast(end - ptr), + 2u, + "Truncated run container header."); + std::memcpy(&numRuns, ptr, sizeof(uint16_t)); + numRuns = folly::Endian::little(numRuns); + ptr += sizeof(uint16_t); + + VELOX_CHECK_GE( + static_cast(end - ptr), + static_cast(numRuns) * 4, + "Truncated run container data."); + for (uint16_t r = 0; r < numRuns; ++r) { + uint16_t start, lengthMinus1; + std::memcpy(&start, ptr, sizeof(uint16_t)); + start = folly::Endian::little(start); + ptr += sizeof(uint16_t); + std::memcpy(&lengthMinus1, ptr, sizeof(uint16_t)); + lengthMinus1 = folly::Endian::little(lengthMinus1); + ptr += sizeof(uint16_t); + for (uint32_t v = start; + v <= static_cast(start) + lengthMinus1; + ++v) { + deletedPositions_.push_back(static_cast(highBits | v)); + } + } + } else if (cardinality <= 4096) { + // Array container: sorted uint16 values. + VELOX_CHECK_GE( + static_cast(end - ptr), + cardinality * 2, + "Truncated array container."); + for (uint32_t j = 0; j < cardinality; ++j) { + uint16_t val; + std::memcpy(&val, ptr, sizeof(uint16_t)); + val = folly::Endian::little(val); + ptr += sizeof(uint16_t); + deletedPositions_.push_back(static_cast(highBits | val)); + } + } else { + // Bitset container: 2^16 bits = 8192 bytes. + static constexpr size_t kBitsetBytes = 8192; + VELOX_CHECK_GE( + static_cast(end - ptr), + kBitsetBytes, + "Truncated bitset container."); + for (uint32_t word = 0; word < 1024; ++word) { + uint64_t bits; + std::memcpy(&bits, ptr + word * 8, sizeof(uint64_t)); + bits = folly::Endian::little(bits); + while (bits != 0) { + uint32_t bit = __builtin_ctzll(bits); + deletedPositions_.push_back( + static_cast(highBits | (word * 64 + bit))); + bits &= bits - 1; + } + } + ptr += kBitsetBytes; + } + } +} + +void DeletionVectorReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + BufferPtr deleteBitmap) { + loadBitmap(); + + if (deletedPositions_.empty()) { + return; + } + + auto* bitmap = deleteBitmap->asMutable(); + int64_t rowNumberLowerBound = + static_cast(splitOffset_ + baseReadOffset); + int64_t rowNumberUpperBound = + rowNumberLowerBound + static_cast(size); + + // Advance positionIndex_ past positions before the current batch. + while (positionIndex_ < deletedPositions_.size() && + deletedPositions_[positionIndex_] < rowNumberLowerBound) { + ++positionIndex_; + } + + // Set bits for positions within the current batch range. + while (positionIndex_ < deletedPositions_.size() && + deletedPositions_[positionIndex_] < rowNumberUpperBound) { + auto bitIndex = static_cast( + deletedPositions_[positionIndex_] - rowNumberLowerBound); + bits::setBit(bitmap, bitIndex); + ++positionIndex_; + } +} + +bool DeletionVectorReader::noMoreData() const { + return loaded_ && positionIndex_ >= deletedPositions_.size(); +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/DeletionVectorReader.h b/velox/connectors/hive/iceberg/DeletionVectorReader.h new file mode 100644 index 000000000000..1639904aeb78 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorReader.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/memory/Memory.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// Reads an Iceberg V3 deletion vector and applies it to the delete bitmap. +/// +/// Iceberg V3 replaces positional delete files with deletion vectors — compact +/// roaring bitmaps stored as blobs inside Puffin files. Each DV is associated +/// with a single base data file and contains 0-based row positions of deleted +/// rows. +/// +/// Compared to the V2 PositionalDeleteFileReader: +/// - No columnar file parsing (Puffin blobs are raw binary, not Parquet) +/// - No file_path filtering (each DV is pre-associated with its data file) +/// - No sorted merge of multiple delete files (one DV per data file) +/// - Direct bitmap-to-bitmap conversion instead of row-by-row position +/// reading +/// +/// The coordinator extracts the blob offset and length from the Puffin footer +/// and provides them via the IcebergDeleteFile metadata. The reader opens the +/// file, reads the raw bytes at the given offset, deserializes the roaring +/// bitmap, and sets bits in the delete bitmap for the current batch range. +class DeletionVectorReader { + public: + /// Constructs a reader for a single deletion vector. + /// + /// @param dvFile Metadata about the deletion vector file. Must have + /// content == FileContent::kDeletionVector. The filePath points to the + /// Puffin file, and the DV blob offset/length are encoded in the + /// lowerBounds/upperBounds fields (key = kDvOffsetFieldId for offset, + /// kDvLengthFieldId for length). + /// @param splitOffset File position of the first row in the split. + /// @param pool Memory pool for bitmap allocations. + /// @param fileSystem Filesystem to read the Puffin file from. + DeletionVectorReader( + const IcebergDeleteFile& dvFile, + uint64_t splitOffset, + memory::MemoryPool* pool); + + /// Reads deleted positions from the DV and sets corresponding bits in the + /// deleteBitmap for the current batch range. + /// + /// @param baseReadOffset Read offset from the beginning of the split in + /// number of rows for the current batch. + /// @param size Number of rows in the current batch. + /// @param deleteBitmap Output bitmap. Bit i is set if the row at file + /// position (splitOffset + baseReadOffset + i) is deleted. + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + BufferPtr deleteBitmap); + + /// Returns true when there is no more data. For DVs this is always true + /// after the first readDeletePositions() call since the entire bitmap is + /// loaded eagerly. + bool noMoreData() const; + + /// Field IDs used to encode DV blob offset and length in the + /// IcebergDeleteFile bounds maps. The coordinator encodes these when + /// building splits from Puffin file metadata. + static constexpr int32_t kDvOffsetFieldId = 100; + static constexpr int32_t kDvLengthFieldId = 101; + + private: + /// Loads the deletion vector bitmap from the Puffin file. Called lazily + /// on the first readDeletePositions() call. + void loadBitmap(); + + /// Parses a roaring bitmap from its portable binary serialization format + /// and populates deletedPositions_ with all set positions. + void deserializeRoaringBitmap(const std::string& data); + + const IcebergDeleteFile dvFile_; + const uint64_t splitOffset_; + + /// Sorted vector of deleted row positions (0-based, relative to the + /// start of the base data file). Populated by loadBitmap(). + std::vector deletedPositions_; + + /// Index into deletedPositions_ tracking how far we've consumed. + size_t positionIndex_{0}; + + /// Whether the bitmap has been loaded from the file. + bool loaded_{false}; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h index 2f9206dfc264..bc08f5d848ca 100644 --- a/velox/connectors/hive/iceberg/IcebergDeleteFile.h +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -27,6 +27,12 @@ enum class FileContent { kData, kPositionalDeletes, kEqualityDeletes, + /// Iceberg V3 deletion vector. A serialized roaring bitmap of deleted row + /// positions stored as a blob inside a Puffin file. More compact than V2 + /// positional delete files and avoids sorted merge of multiple delete files. + /// The coordinator extracts the blob offset and length from the Puffin + /// footer and provides them via IcebergDeleteFile fields. + kDeletionVector, }; struct IcebergDeleteFile { diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 39a6864cdd77..ef5530134762 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -78,6 +78,7 @@ void IcebergSplitReader::prepareSplit( baseReadOffset_ = 0; splitOffset_ = baseRowReader_->nextRowNumber(); positionalDeleteFileReaders_.clear(); + deletionVectorReaders_.clear(); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { @@ -118,6 +119,12 @@ void IcebergSplitReader::prepareSplit( } } else if (deleteFile.content == FileContent::kEqualityDeletes) { VELOX_NYI("Equality deletes are not yet supported."); + } else if (deleteFile.content == FileContent::kDeletionVector) { + if (deleteFile.recordCount > 0) { + deletionVectorReaders_.push_back( + std::make_unique( + deleteFile, splitOffset_, connectorQueryCtx_->memoryPool())); + } } else { VELOX_NYI( "Unsupported delete file content type: {}", @@ -142,7 +149,8 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { return 0; } - if (!positionalDeleteFileReaders_.empty()) { + if (!positionalDeleteFileReaders_.empty() || + !deletionVectorReaders_.empty()) { auto numBytes = bits::nbytes(actualSize); dwio::common::ensureCapacity( deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), false, true); @@ -157,6 +165,17 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { ++iter; } } + + for (auto iter = deletionVectorReaders_.begin(); + iter != deletionVectorReaders_.end();) { + (*iter)->readDeletePositions(baseReadOffset_, actualSize, deleteBitmap_); + + if ((*iter)->noMoreData()) { + iter = deletionVectorReaders_.erase(iter); + } else { + ++iter; + } + } } mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 0c661acd3598..dd083e0270a8 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -18,6 +18,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" namespace facebook::velox::connector::hive::iceberg { @@ -101,5 +102,8 @@ class IcebergSplitReader : public SplitReader { std::list> positionalDeleteFileReaders_; BufferPtr deleteBitmap_; + + /// Readers for Iceberg V3 deletion vectors (Puffin-encoded roaring bitmaps). + std::list> deletionVectorReaders_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index a84064bb8613..87a16b6b0d0e 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -83,6 +83,19 @@ if(NOT VELOX_DISABLE_GOOGLETEST) GTest::gtest_main ) + add_executable(velox_hive_iceberg_deletion_vector_test DeletionVectorReaderTest.cpp) + add_test(velox_hive_iceberg_deletion_vector_test velox_hive_iceberg_deletion_vector_test) + + target_link_libraries( + velox_hive_iceberg_deletion_vector_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) + if(VELOX_ENABLE_PARQUET) target_link_libraries(velox_hive_iceberg_test velox_dwio_parquet_reader) diff --git a/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp b/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp new file mode 100644 index 000000000000..3000a502f641 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp @@ -0,0 +1,560 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +#include + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/testutil/TempFilePath.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive::iceberg; +using namespace facebook::velox::common::testutil; + +namespace { + +/// Serializes a roaring bitmap in the portable format (no-run variant, +/// cookie = 12346). Supports only array containers (cardinality <= 4096). +/// This is the simplest format the DeletionVectorReader needs to parse. +std::string serializeRoaringBitmapNoRun(const std::vector& positions) { + if (positions.empty()) { + // Empty bitmap: cookie + 0 containers. + std::string data(8, '\0'); + uint32_t cookie = 12346; + uint32_t numContainers = 0; + std::memcpy(data.data(), &cookie, 4); + std::memcpy(data.data() + 4, &numContainers, 4); + return data; + } + + // Group positions by high 16 bits. + std::map> containers; + for (auto pos : positions) { + auto key = static_cast(pos >> 16); + auto low = static_cast(pos & 0xFFFF); + containers[key].push_back(low); + } + + for (auto& [key, vals] : containers) { + std::sort(vals.begin(), vals.end()); + } + + uint32_t numContainers = static_cast(containers.size()); + + std::string data; + // Cookie. + uint32_t cookie = 12346; + data.append(reinterpret_cast(&cookie), 4); + // Container count. + data.append(reinterpret_cast(&numContainers), 4); + + // Key-cardinality pairs. + for (auto& [key, vals] : containers) { + uint16_t cardMinus1 = static_cast(vals.size() - 1); + data.append(reinterpret_cast(&key), 2); + data.append(reinterpret_cast(&cardMinus1), 2); + } + + // Offset section (required for >= 4 containers). + if (numContainers >= 4) { + uint32_t offset = 4 + 4 + numContainers * 4 + numContainers * 4; + for (auto& [key, vals] : containers) { + data.append(reinterpret_cast(&offset), 4); + offset += static_cast(vals.size()) * 2; + } + } + + // Container data (array containers: sorted uint16 values). + for (auto& [key, vals] : containers) { + for (auto v : vals) { + data.append(reinterpret_cast(&v), 2); + } + } + + return data; +} + +/// Serializes a roaring bitmap in the portable format with run containers +/// (cookie = 12347). All containers are run-encoded. +std::string serializeRoaringBitmapWithRuns( + const std::vector< + std::pair>>>& + containerRuns) { + // containerRuns: vector of (highBitsKey, vector of (start, lengthMinus1)). + uint32_t numContainers = static_cast(containerRuns.size()); + + // Cookie: low 16 bits = 12347, high 16 bits = numContainers - 1. + uint32_t cookie = static_cast(12347) | ((numContainers - 1) << 16); + + std::string data; + data.append(reinterpret_cast(&cookie), 4); + + // Run bitmap: all containers are run containers. ceil(numContainers / 8) + // bytes. + uint32_t runBitmapBytes = (numContainers + 7) / 8; + std::vector runBitmap(runBitmapBytes, 0xFF); + data.append(reinterpret_cast(runBitmap.data()), runBitmapBytes); + + // Compute cardinality for each container. + std::vector cardinalities; + for (auto& [key, runs] : containerRuns) { + uint32_t card = 0; + for (auto& [start, lenMinus1] : runs) { + card += static_cast(lenMinus1) + 1; + } + cardinalities.push_back(card); + } + + // Key-cardinality pairs. + for (size_t i = 0; i < containerRuns.size(); ++i) { + uint16_t key = containerRuns[i].first; + uint16_t cardMinus1 = static_cast(cardinalities[i] - 1); + data.append(reinterpret_cast(&key), 2); + data.append(reinterpret_cast(&cardMinus1), 2); + } + + // Container data: each run container has numRuns (uint16) followed by + // (start, lengthMinus1) pairs. + for (auto& [key, runs] : containerRuns) { + uint16_t numRuns = static_cast(runs.size()); + data.append(reinterpret_cast(&numRuns), 2); + for (auto& [start, lenMinus1] : runs) { + data.append(reinterpret_cast(&start), 2); + data.append(reinterpret_cast(&lenMinus1), 2); + } + } + + return data; +} + +/// Writes binary data to a temp file and returns the path. +std::shared_ptr writeDvFile(const std::string& bitmapData) { + auto tempFile = TempFilePath::create(); + // Write directly via C++ streams since TempFilePath already creates the + // file and the local filesystem openFileForWrite may not overwrite. + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + VELOX_CHECK(out.good(), "Failed to open temp file for writing"); + out.write(bitmapData.data(), static_cast(bitmapData.size())); + out.close(); + return tempFile; +} + +/// Creates an IcebergDeleteFile for a deletion vector. +IcebergDeleteFile makeDvDeleteFile( + const std::string& filePath, + uint64_t recordCount, + uint64_t fileSize, + uint64_t blobOffset = 0, + std::optional blobLength = std::nullopt) { + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = + std::to_string(blobOffset); + if (blobLength.has_value()) { + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(blobLength.value()); + } else { + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(fileSize); + } + + return IcebergDeleteFile( + FileContent::kDeletionVector, + filePath, + dwio::common::FileFormat::DWRF, + recordCount, + fileSize, + {}, + lowerBounds, + upperBounds); +} + +/// Extracts which bits are set in a bitmap buffer. +std::vector getSetBits(const BufferPtr& bitmap, uint64_t size) { + auto* raw = bitmap->as(); + std::vector result; + for (uint64_t i = 0; i < size; ++i) { + if (bits::isBitSet(raw, i)) { + result.push_back(i); + } + } + return result; +} + +} // namespace + +class DeletionVectorReaderTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + } + + void SetUp() override { + filesystems::registerLocalFileSystem(); + pool_ = memory::memoryManager()->addLeafPool("DeletionVectorReaderTest"); + } + + BufferPtr allocateBitmap(uint64_t numBits) { + auto numBytes = bits::nbytes(numBits); + auto buffer = AlignedBuffer::allocate(numBytes, pool_.get(), 0); + return buffer; + } + + std::shared_ptr pool_; +}; + +TEST_F(DeletionVectorReaderTest, basicArrayContainer) { + // Positions: 0, 5, 10, 99. + std::vector positions = {0, 5, 10, 99}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + EXPECT_FALSE(reader.noMoreData()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{0, 5, 10, 99})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, batchRangeFiltering) { + // Positions: 10, 20, 30, 40, 50. + std::vector positions = {10, 20, 30, 40, 50}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // First batch: rows 0-24 (should contain positions 10, 20). + auto bitmap1 = allocateBitmap(25); + reader.readDeletePositions(0, 25, bitmap1); + auto bits1 = getSetBits(bitmap1, 25); + EXPECT_EQ(bits1, (std::vector{10, 20})); + EXPECT_FALSE(reader.noMoreData()); + + // Second batch: rows 25-49 (should contain positions 30, 40). + auto bitmap2 = allocateBitmap(25); + reader.readDeletePositions(25, 25, bitmap2); + auto bits2 = getSetBits(bitmap2, 25); + EXPECT_EQ(bits2, (std::vector{5, 15})); + EXPECT_FALSE(reader.noMoreData()); + + // Third batch: rows 50-74 (should contain position 50). + auto bitmap3 = allocateBitmap(25); + reader.readDeletePositions(50, 25, bitmap3); + auto bits3 = getSetBits(bitmap3, 25); + EXPECT_EQ(bits3, (std::vector{0})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, splitOffset) { + // Positions: 100, 105, 110. + std::vector positions = {100, 105, 110}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + // Split starts at row 100. + DeletionVectorReader reader(dvFile, 100, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + // Positions 100, 105, 110 relative to splitOffset=100, baseReadOffset=0 + // become bit indices 0, 5, 10. + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{0, 5, 10})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, splitOffsetWithBaseReadOffset) { + // Positions: 200, 210, 220. + std::vector positions = {200, 210, 220}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + // Split starts at row 100. + DeletionVectorReader reader(dvFile, 100, pool_.get()); + + // First batch: baseReadOffset=100, so file positions [200, 300). + // Positions 200, 210, 220 are all in range. + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(100, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{0, 10, 20})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, noDeletesInRange) { + // Positions: 1000, 2000. + std::vector positions = {1000, 2000}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Batch covers rows 0-99, no deletions in this range. + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_TRUE(setBits.empty()); + EXPECT_FALSE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, runContainers) { + // Use run-encoded containers: positions 10-19 and 50-59. + std::vector>>> + containerRuns = { + {0, {{10, 9}, {50, 9}}}, + }; + auto bitmapData = serializeRoaringBitmapWithRuns(containerRuns); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 20, fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + // Expect positions 10-19 and 50-59. + std::vector expected; + for (uint64_t i = 10; i <= 19; ++i) { + expected.push_back(i); + } + for (uint64_t i = 50; i <= 59; ++i) { + expected.push_back(i); + } + EXPECT_EQ(setBits, expected); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, largePositionsMultipleContainers) { + // Positions spanning two containers: one in container 0 (key=0), one in + // container 1 (key=1, i.e. pos >= 65536). + std::vector positions = {5, 100, 65536, 65600}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Read a batch covering all positions. + auto bitmap = allocateBitmap(66000); + reader.readDeletePositions(0, 66000, bitmap); + + auto setBits = getSetBits(bitmap, 66000); + EXPECT_EQ(setBits, (std::vector{5, 100, 65536, 65600})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, blobOffset) { + // Write a file with some padding before the actual bitmap data. + std::vector positions = {3, 7, 11}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + + // Prepend 64 bytes of padding. + std::string padding(64, 'X'); + std::string fileContent = padding + bitmapData; + + auto tempFile = writeDvFile(fileContent); + auto fileSize = static_cast(fileContent.size()); + + auto dvFile = makeDvDeleteFile( + tempFile->getPath(), positions.size(), fileSize, 64, bitmapData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{3, 7, 11})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, constructorRejectsWrongContentType) { + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write("dummy", 5); + } + + IcebergDeleteFile badFile( + FileContent::kPositionalDeletes, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + 5); + + VELOX_ASSERT_THROW( + DeletionVectorReader(badFile, 0, pool_.get()), + "Expected deletion vector file"); +} + +TEST_F(DeletionVectorReaderTest, constructorRejectsEmptyDv) { + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write("dummy", 5); + } + + IcebergDeleteFile emptyDv( + FileContent::kDeletionVector, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 0, + 5); + + VELOX_ASSERT_THROW( + DeletionVectorReader(emptyDv, 0, pool_.get()), "Empty deletion vector"); +} + +TEST_F(DeletionVectorReaderTest, noMoreDataAfterAllConsumed) { + std::vector positions = {0, 1, 2}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + EXPECT_FALSE(reader.noMoreData()); + + auto bitmap = allocateBitmap(10); + reader.readDeletePositions(0, 10, bitmap); + EXPECT_TRUE(reader.noMoreData()); + + // Additional reads should be no-ops. + auto bitmap2 = allocateBitmap(10); + reader.readDeletePositions(10, 10, bitmap2); + auto setBits2 = getSetBits(bitmap2, 10); + EXPECT_TRUE(setBits2.empty()); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, singlePosition) { + std::vector positions = {42}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{42})); +} + +TEST_F(DeletionVectorReaderTest, consecutivePositions) { + // Positions: 0 through 99 (100 consecutive positions). + std::vector positions; + positions.reserve(100); + for (int64_t i = 0; i < 100; ++i) { + positions.push_back(i); + } + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + std::vector expected; + expected.reserve(100); + for (uint64_t i = 0; i < 100; ++i) { + expected.push_back(i); + } + EXPECT_EQ(setBits, expected); +} + +TEST_F(DeletionVectorReaderTest, invalidBitmapTooSmall) { + // Write a file that is too small to contain a valid roaring bitmap header. + std::string tinyData(4, '\0'); + auto tempFile = writeDvFile(tinyData); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 1, tinyData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(10); + VELOX_ASSERT_THROW(reader.readDeletePositions(0, 10, bitmap), "too small"); +} + +TEST_F(DeletionVectorReaderTest, invalidBitmapBadCookie) { + // Write a file with an invalid cookie. + std::string badData(12, '\0'); + uint32_t badCookie = 99999; + std::memcpy(badData.data(), &badCookie, 4); + auto tempFile = writeDvFile(badData); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 1, badData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(10); + VELOX_ASSERT_THROW( + reader.readDeletePositions(0, 10, bitmap), + "Unknown roaring bitmap cookie"); +} From 243eef658f60d545cc0846aae18002ea58ef72ce Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 23:31:19 -0700 Subject: [PATCH 3/4] feat: [velox][iceberg] Add Iceberg equality delete file reader (#16871) Summary: Implements Iceberg equality delete support for the Velox Iceberg connector. Equality delete files contain rows with values for one or more columns (identified by equalityFieldIds). A base data row is deleted if its values match ALL specified columns of ANY row in the delete file. The implementation: - Adds EqualityDeleteFileReader that eagerly reads the entire delete file and builds an in-memory hash multimap of delete key tuples during construction. - Wires EqualityDeleteFileReader into IcebergSplitReader::prepareSplit() to resolve equalityFieldIds to column names/types from the table schema, and into IcebergSplitReader::next() to apply post-read equality delete filtering with row compaction. - Handles lazy vectors from file readers via loadedVector() before accessing values for hashing and comparison. - Supports BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, VARCHAR, VARBINARY, and TIMESTAMP column types. Differential Revision: D97530141 --- velox/connectors/hive/iceberg/CMakeLists.txt | 1 + .../hive/iceberg/EqualityDeleteFileReader.cpp | 353 +++++++++++++++++ .../hive/iceberg/EqualityDeleteFileReader.h | 139 +++++++ .../hive/iceberg/IcebergSplitReader.cpp | 106 ++++- .../hive/iceberg/IcebergSplitReader.h | 5 + .../hive/iceberg/tests/CMakeLists.txt | 13 + .../tests/EqualityDeleteFileReaderTest.cpp | 361 ++++++++++++++++++ 7 files changed, 977 insertions(+), 1 deletion(-) create mode 100644 velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp create mode 100644 velox/connectors/hive/iceberg/EqualityDeleteFileReader.h create mode 100644 velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index d462e891fc3f..233579680b92 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -22,6 +22,7 @@ set( IcebergDataSink.cpp IcebergDataSource.cpp IcebergPartitionName.cpp + EqualityDeleteFileReader.cpp IcebergSplit.cpp IcebergSplitReader.cpp PartitionSpec.cpp diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp new file mode 100644 index 000000000000..7b11141ab684 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp @@ -0,0 +1,353 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" + +#include "velox/common/base/BitUtil.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" +#include "velox/connectors/hive/HiveConnectorUtil.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +/// Hashes a single value from a vector at the given index. +/// Handles lazy vectors via loadedVector(). Returns 0 for null values. +uint64_t hashValue(const VectorPtr& vectorPtr, vector_size_t index) { + const auto* vector = vectorPtr->loadedVector(); + if (vector->isNullAt(index)) { + return 0; + } + + auto type = vector->type(); + switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum) + case TypeKind::BOOLEAN: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::TINYINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::SMALLINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::INTEGER: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::BIGINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::REAL: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::DOUBLE: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto sv = vector->as>()->valueAt(index); + return folly::hasher{}( + std::string_view(sv.data(), sv.size())); + } + case TypeKind::TIMESTAMP: { + auto ts = vector->as>()->valueAt(index); + return std::hash{}(ts.toNanos()); + } + default: + VELOX_NYI( + "Equality delete hash not implemented for type: {}", + type->toString()); + } +} + +/// Compares two values from vectors at given indices. +/// Handles lazy vectors via loadedVector(). +bool compareValues( + const VectorPtr& leftPtr, + vector_size_t leftIdx, + const VectorPtr& rightPtr, + vector_size_t rightIdx) { + const auto* left = leftPtr->loadedVector(); + const auto* right = rightPtr->loadedVector(); + bool leftNull = left->isNullAt(leftIdx); + bool rightNull = right->isNullAt(rightIdx); + if (leftNull && rightNull) { + return true; + } + if (leftNull || rightNull) { + return false; + } + + auto type = left->type(); + switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum) + case TypeKind::BOOLEAN: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::TINYINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::SMALLINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::INTEGER: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::BIGINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::REAL: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::DOUBLE: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto lv = left->as>()->valueAt(leftIdx); + auto rv = right->as>()->valueAt(rightIdx); + return std::string_view(lv.data(), lv.size()) == + std::string_view(rv.data(), rv.size()); + } + case TypeKind::TIMESTAMP: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + default: + VELOX_NYI( + "Equality delete comparison not implemented for type: {}", + type->toString()); + } +} + +} // namespace + +EqualityDeleteFileReader::EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::vector& equalityColumnNames, + const std::vector& equalityColumnTypes, + const std::string& /*baseFilePath*/, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId) + : equalityColumnNames_(equalityColumnNames), + equalityColumnTypes_(equalityColumnTypes), + pool_(connectorQueryCtx->memoryPool()) { + VELOX_CHECK( + deleteFile.content == FileContent::kEqualityDeletes, + "Expected equality delete file but got content type: {}", + static_cast(deleteFile.content)); + VELOX_CHECK_GT(deleteFile.recordCount, 0, "Empty equality delete file."); + VELOX_CHECK( + !equalityColumnNames_.empty(), + "Equality delete file must specify at least one column."); + VELOX_CHECK_EQ( + equalityColumnNames_.size(), + equalityColumnTypes_.size(), + "Equality column names and types must have the same size."); + + // Build the file schema for the equality delete columns only. + auto deleteFileSchema = + ROW(std::vector(equalityColumnNames_), + std::vector(equalityColumnTypes_)); + + // Create a ScanSpec that reads only the equality delete columns. + auto scanSpec = std::make_shared(""); + for (size_t i = 0; i < equalityColumnNames_.size(); ++i) { + scanSpec->addField(equalityColumnNames_[i], static_cast(i)); + } + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + dwio::common::ReaderOptions deleteReaderOpts(pool_); + configureReaderOptions( + hiveConfig, + connectorQueryCtx, + deleteFileSchema, + deleteSplit, + /*tableParameters=*/{}, + deleteReaderOpts); + + const FileHandleKey fileHandleKey{ + .filename = deleteFile.filePath, + .tokenProvider = connectorQueryCtx->fsTokenProvider()}; + auto deleteFileHandleCachePtr = fileHandleFactory->generate(fileHandleKey); + auto deleteFileInput = BufferedInputBuilder::getInstance()->create( + *deleteFileHandleCachePtr, + deleteReaderOpts, + connectorQueryCtx, + ioStatistics, + ioStats, + executor); + + auto deleteReader = + dwio::common::getReaderFactory(deleteReaderOpts.fileFormat()) + ->createReader(std::move(deleteFileInput), deleteReaderOpts); + + if (!testFilters( + scanSpec.get(), + deleteReader.get(), + deleteSplit->filePath, + deleteSplit->partitionKeys, + {}, + hiveConfig->readTimestampPartitionValueAsLocalTime( + connectorQueryCtx->sessionProperties()))) { + runtimeStats.skippedSplitBytes += static_cast(deleteSplit->length); + return; + } + + dwio::common::RowReaderOptions deleteRowReaderOpts; + configureRowReaderOptions( + {}, + scanSpec, + nullptr, + deleteFileSchema, + deleteSplit, + nullptr, + nullptr, + nullptr, + deleteRowReaderOpts); + + auto deleteRowReader = deleteReader->createRowReader(deleteRowReaderOpts); + + // Read the entire equality delete file and build the hash set. + VectorPtr output; + output = BaseVector::create(deleteFileSchema, 0, pool_); + + while (true) { + auto rowsRead = deleteRowReader->next( + std::max(static_cast(1'000), deleteFile.recordCount), output); + if (rowsRead == 0) { + break; + } + + auto numRows = output->size(); + if (numRows == 0) { + continue; + } + + output->loadedVector(); + auto rowOutput = std::dynamic_pointer_cast(output); + VELOX_CHECK_NOT_NULL(rowOutput); + + size_t batchIndex = deleteRows_.size(); + deleteRows_.push_back(rowOutput); + + // Resolve column indices on the first batch. + if (deleteColumnIndices_.empty()) { + for (const auto& colName : equalityColumnNames_) { + auto idx = rowOutput->type()->as().getChildIdx(colName); + deleteColumnIndices_.push_back(static_cast(idx)); + } + } + + // Hash each row and insert into the multimap. + for (vector_size_t i = 0; i < numRows; ++i) { + uint64_t hash = hashRow(rowOutput, i); + deleteKeyHashes_.emplace(hash, DeleteKeyEntry{batchIndex, i}); + } + + // Reset output for next batch. + output = BaseVector::create(deleteFileSchema, 0, pool_); + } +} + +void EqualityDeleteFileReader::applyDeletes( + const RowVectorPtr& output, + BufferPtr deleteBitmap) { + if (deleteKeyHashes_.empty() || output->size() == 0) { + return; + } + + auto* bitmap = deleteBitmap->asMutable(); + + // For each row in the output, compute its hash and probe the delete set. + for (vector_size_t i = 0; i < output->size(); ++i) { + // Skip rows already deleted by positional/DV deletes. + if (bits::isBitSet(bitmap, i)) { + continue; + } + + uint64_t hash = hashRow(output, i); + auto range = deleteKeyHashes_.equal_range(hash); + + for (auto it = range.first; it != range.second; ++it) { + auto& entry = it->second; + if (equalRows(output, i, deleteRows_[entry.batchIndex], entry.rowIndex)) { + bits::setBit(bitmap, i); + break; + } + } + } +} + +uint64_t EqualityDeleteFileReader::hashRow( + const RowVectorPtr& row, + vector_size_t index) const { + uint64_t hash = 0; + + // For the delete file rows, use deleteColumnIndices_. + // For the base data rows, look up columns by name. + const auto& rowType = row->type()->asRow(); + + for (size_t c = 0; c < equalityColumnNames_.size(); ++c) { + auto colIdx = rowType.getChildIdxIfExists(equalityColumnNames_[c]); + VELOX_CHECK( + colIdx.has_value(), + "Column not found in row: {}", + equalityColumnNames_[c]); + auto colHash = hashValue(row->childAt(*colIdx), index); + // Combine hashes using a simple mix. + hash ^= colHash + 0x9e3779b97f4a7c15ULL + (hash << 6) + (hash >> 2); + } + return hash; +} + +bool EqualityDeleteFileReader::equalRows( + const RowVectorPtr& left, + vector_size_t leftIndex, + const RowVectorPtr& right, + vector_size_t rightIndex) const { + const auto& leftType = left->type()->asRow(); + const auto& rightType = right->type()->asRow(); + + for (size_t c = 0; c < equalityColumnNames_.size(); ++c) { + auto leftColIdx = leftType.getChildIdxIfExists(equalityColumnNames_[c]); + auto rightColIdx = rightType.getChildIdxIfExists(equalityColumnNames_[c]); + VELOX_CHECK(leftColIdx.has_value() && rightColIdx.has_value()); + + if (!compareValues( + left->childAt(*leftColIdx), + leftIndex, + right->childAt(*rightColIdx), + rightIndex)) { + return false; + } + } + return true; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h new file mode 100644 index 000000000000..e77b6a8c5068 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/dwio/common/Reader.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// Reads an Iceberg equality delete file and filters base data rows whose +/// equality delete column values match any row in the delete file. +/// +/// Iceberg equality delete files contain rows with values for one or more +/// columns (identified by equalityFieldIds). A base data row is deleted if +/// its values match ALL specified columns of ANY row in the delete file. +/// +/// Unlike positional deletes (which set bits before reading), equality deletes +/// require reading the base data first, then probing each row against the +/// delete set. The reader eagerly loads all delete key tuples from the file +/// into an in-memory hash set during construction. +/// +/// The equality delete column names are resolved from equalityFieldIds via +/// the table schema provided by the caller. +class EqualityDeleteFileReader { + public: + /// Constructs a reader for a single equality delete file. + /// + /// Eagerly reads the entire delete file and builds an in-memory hash set + /// of delete key tuples. The delete file is fully consumed during + /// construction. + /// + /// @param deleteFile Metadata about the equality delete file. Must have + /// content == FileContent::kEqualityDeletes and non-empty + /// equalityFieldIds. + /// @param equalityColumnNames Ordered column names corresponding to + /// equalityFieldIds, resolved by the caller from the table schema. + /// @param equalityColumnTypes Ordered column types corresponding to + /// equalityFieldIds. + /// @param baseFilePath Path of the base data file being read. + /// @param fileHandleFactory Factory for creating file handles. + /// @param connectorQueryCtx Query context for memory and config. + /// @param executor IO executor for async reads. + /// @param hiveConfig Hive configuration. + /// @param ioStatistics IO statistics collector. + /// @param ioStats IO stats tracker. + /// @param runtimeStats Runtime statistics for recording skipped bytes. + /// @param connectorId Connector identifier. + EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::vector& equalityColumnNames, + const std::vector& equalityColumnTypes, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId); + + /// Applies equality deletes to the output vector by setting bits in the + /// delete bitmap for rows whose equality column values match any delete + /// key tuple. + /// + /// @param output The base data output vector to filter. + /// @param deleteBitmap Output bitmap. Bit i is set if row i matches an + /// equality delete. The bitmap must be pre-allocated to cover at least + /// output->size() rows. + void applyDeletes(const RowVectorPtr& output, BufferPtr deleteBitmap); + + /// Returns the number of delete key tuples loaded from the file. + size_t numDeleteKeys() const { + return deleteKeyHashes_.size(); + } + + /// Returns true if this reader has no delete keys (file was skipped or + /// empty). When true, applyDeletes() is a no-op. + bool empty() const { + return deleteKeyHashes_.empty(); + } + + private: + /// Hashes a single row's equality delete columns into a uint64_t key. + /// Uses a simple combine-hash approach over the raw values. + uint64_t hashRow(const RowVectorPtr& row, vector_size_t index) const; + + /// Checks whether two rows are equal on all equality delete columns. + bool equalRows( + const RowVectorPtr& left, + vector_size_t leftIndex, + const RowVectorPtr& right, + vector_size_t rightIndex) const; + + /// Column names and types for equality delete comparison. + std::vector equalityColumnNames_; + std::vector equalityColumnTypes_; + + /// Column indices in the delete file output vector. + std::vector deleteColumnIndices_; + + /// All rows read from the equality delete file, stored for equality + /// comparison during probing. + std::vector deleteRows_; + + /// Hash set storing (hash → vector of (batch index, row index)) for + /// all delete key tuples. Used for fast probing. + struct DeleteKeyEntry { + size_t batchIndex; + vector_size_t rowIndex; + }; + std::unordered_multimap deleteKeyHashes_; + + memory::MemoryPool* const pool_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index ef5530134762..bd727bb419fe 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -20,6 +20,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/encode/Base64.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" @@ -79,6 +80,7 @@ void IcebergSplitReader::prepareSplit( splitOffset_ = baseRowReader_->nextRowNumber(); positionalDeleteFileReaders_.clear(); deletionVectorReaders_.clear(); + equalityDeleteFileReaders_.clear(); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { @@ -118,7 +120,45 @@ void IcebergSplitReader::prepareSplit( hiveSplit_->connectorId)); } } else if (deleteFile.content == FileContent::kEqualityDeletes) { - VELOX_NYI("Equality deletes are not yet supported."); + if (deleteFile.recordCount > 0 && !deleteFile.equalityFieldIds.empty()) { + // Resolve equalityFieldIds to column names and types. In Iceberg, + // field IDs for top-level columns are assigned sequentially starting + // from 1, matching the column order in the table schema. + std::vector equalityColumnNames; + std::vector equalityColumnTypes; + + const auto& dataColumns = hiveTableHandle_->dataColumns(); + if (dataColumns) { + for (const auto& eqFieldId : deleteFile.equalityFieldIds) { + // Field IDs are 1-based; column index is fieldId - 1. + auto colIdx = static_cast(eqFieldId - 1); + VELOX_CHECK_LT( + colIdx, + dataColumns->size(), + "Equality delete field ID out of range: {}", + eqFieldId); + equalityColumnNames.push_back(dataColumns->nameOf(colIdx)); + equalityColumnTypes.push_back(dataColumns->childAt(colIdx)); + } + } + + if (!equalityColumnNames.empty()) { + equalityDeleteFileReaders_.push_back( + std::make_unique( + deleteFile, + equalityColumnNames, + equalityColumnTypes, + hiveSplit_->filePath, + fileHandleFactory_, + connectorQueryCtx_, + ioExecutor_, + hiveConfig_, + ioStatistics_, + ioStats_, + runtimeStats, + hiveSplit_->connectorId)); + } + } } else if (deleteFile.content == FileContent::kDeletionVector) { if (deleteFile.recordCount > 0) { deletionVectorReaders_.push_back( @@ -184,6 +224,70 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation); + // Apply equality deletes after reading base data. Unlike positional deletes + // (which set bits before reading), equality deletes require the data values + // to be available for comparison. + if (rowsScanned > 0 && !equalityDeleteFileReaders_.empty()) { + auto outputRowVector = std::dynamic_pointer_cast(output); + VELOX_CHECK_NOT_NULL( + outputRowVector, "Output must be a RowVector for equality deletes."); + + auto numRows = outputRowVector->size(); + + // Use a separate bitmap for equality deletes to track which rows to + // remove from the output. + BufferPtr eqDeleteBitmap = AlignedBuffer::allocate( + numRows, connectorQueryCtx_->memoryPool()); + std::memset( + eqDeleteBitmap->asMutable(), 0, eqDeleteBitmap->size()); + + for (auto& reader : equalityDeleteFileReaders_) { + reader->applyDeletes(outputRowVector, eqDeleteBitmap); + } + + // Count surviving rows and compact the output if any rows were deleted. + auto* eqBitmap = eqDeleteBitmap->as(); + vector_size_t numDeleted = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (bits::isBitSet(eqBitmap, i)) { + ++numDeleted; + } + } + + if (numDeleted > 0) { + vector_size_t numSurviving = numRows - numDeleted; + if (numSurviving == 0) { + output = BaseVector::create( + outputRowVector->type(), 0, connectorQueryCtx_->memoryPool()); + return 0; + } + + // Build a list of surviving row ranges and use it to compact. + std::vector ranges; + ranges.reserve(numSurviving); + vector_size_t targetIdx = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (!bits::isBitSet(eqBitmap, i)) { + ranges.push_back({i, targetIdx++, 1}); + } + } + + auto newOutput = BaseVector::create( + outputRowVector->type(), + numSurviving, + connectorQueryCtx_->memoryPool()); + auto newRowOutput = std::dynamic_pointer_cast(newOutput); + for (auto i = 0; i < outputRowVector->childrenSize(); ++i) { + newRowOutput->childAt(i)->resize(numSurviving); + newRowOutput->childAt(i)->copyRanges( + outputRowVector->childAt(i).get(), ranges); + } + newRowOutput->resize(numSurviving); + output = newRowOutput; + rowsScanned = numSurviving; + } + } + return rowsScanned; } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index dd083e0270a8..af9bf0660e00 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -19,6 +19,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" #include "velox/connectors/hive/iceberg/DeletionVectorReader.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" namespace facebook::velox::connector::hive::iceberg { @@ -105,5 +106,9 @@ class IcebergSplitReader : public SplitReader { /// Readers for Iceberg V3 deletion vectors (Puffin-encoded roaring bitmaps). std::list> deletionVectorReaders_; + + /// Readers for equality delete files. + std::list> + equalityDeleteFileReaders_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index 87a16b6b0d0e..08a509442fa3 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -107,4 +107,17 @@ if(NOT VELOX_DISABLE_GOOGLETEST) velox_dwio_parquet_writer ) endif() + + add_executable(velox_hive_iceberg_equality_delete_test EqualityDeleteFileReaderTest.cpp) + add_test(velox_hive_iceberg_equality_delete_test velox_hive_iceberg_equality_delete_test) + + target_link_libraries( + velox_hive_iceberg_equality_delete_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) endif() diff --git a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp new file mode 100644 index 000000000000..eb360f10e975 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp @@ -0,0 +1,361 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/common/testutil/TempDirectoryPath.h" +#include "velox/connectors/hive/iceberg/IcebergConnector.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +namespace facebook::velox::connector::hive::iceberg { + +using namespace facebook::velox::exec::test; + +namespace { + +const std::string kIcebergConnectorId = "test-iceberg-eq-delete"; + +} // namespace + +/// End-to-end tests for equality deletes via the IcebergSplitReader. +/// These tests write DWRF data files and delete files, then execute +/// table scans verifying that matching rows are filtered out. +class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + IcebergConnectorFactory icebergFactory; + auto icebergConnector = icebergFactory.newConnector( + kIcebergConnectorId, + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(icebergConnector); + } + + void TearDown() override { + connector::unregisterConnector(kIcebergConnectorId); + HiveConnectorTestBase::TearDown(); + } + + uint64_t getFileSize(const std::string& path) { + return filesystems::getFileSystem(path, nullptr) + ->openFileForRead(path) + ->size(); + } + + /// Writes a DWRF data file containing the given vectors. + std::shared_ptr writeDataFile( + const std::vector& data) { + auto file = common::testutil::TempFilePath::create(); + writeToFile(file->getPath(), data); + return file; + } + + /// Writes a DWRF delete file containing the equality delete rows. + std::shared_ptr writeEqDeleteFile( + const std::vector& deleteData) { + auto file = common::testutil::TempFilePath::create(); + writeToFile(file->getPath(), deleteData); + return file; + } + + /// Creates splits with equality delete files attached. + std::vector> makeSplits( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + auto fileSize = getFileSize(dataFilePath); + return {std::make_shared( + kIcebergConnectorId, + dataFilePath, + dwio::common::FileFormat::DWRF, + 0, + fileSize, + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + deleteFiles)}; + } + + /// Builds a table scan plan node with the given schema. + core::PlanNodePtr makeTableScanPlan(const RowTypePtr& rowType) { + return PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .dataColumns(rowType) + .endTableScan() + .planNode(); + } +}; + +/// Verifies that base rows matching the equality delete file are removed. +TEST_F(EqualityDeleteFileReaderTest, basicSingleColumnDelete) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), + makeFlatVector( + {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where id == 3 or id == 7. + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({3, 7}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); // field ID 1 = column 0 = "id" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({0, 1, 2, 4, 5, 6, 8, 9}), + makeFlatVector({"a", "b", "c", "e", "f", "g", "i", "j"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies multi-column equality deletes (both columns must match). +TEST_F(EqualityDeleteFileReaderTest, multiColumnDelete) { + auto rowType = ROW({"a", "b", "c"}, {INTEGER(), VARCHAR(), BIGINT()}); + + auto baseData = makeRowVector( + {"a", "b", "c"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"x", "y", "z", "x", "y"}), + makeFlatVector({10, 20, 30, 40, 50}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where (a=2, b="y") — matches row index 1. + // Also (a=5, b="y") — matches row index 4. + // But (a=1, b="y") — no match (a=1 has b="x"). + auto deleteData = makeRowVector( + {"a", "b"}, + { + makeFlatVector({2, 5, 1}), + makeFlatVector({"y", "y", "y"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1, 2}); // field IDs 1,2 = columns "a","b" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows 0, 2, 3 survive (rows 1 and 4 deleted). + auto expected = makeRowVector( + {"a", "b", "c"}, + { + makeFlatVector({1, 3, 4}), + makeFlatVector({"x", "z", "x"}), + makeFlatVector({10, 30, 40}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when no rows match, all rows survive. +TEST_F(EqualityDeleteFileReaderTest, noMatchingDeletes) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete file has values not present in base data. + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({100, 200}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that all rows are deleted when every base row matches. +TEST_F(EqualityDeleteFileReaderTest, allRowsDeleted) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + EXPECT_EQ(result->size(), 0); +} + +/// Verifies equality deletes with VARCHAR columns. +TEST_F(EqualityDeleteFileReaderTest, stringColumnDelete) { + auto rowType = ROW({"name", "age"}, {VARCHAR(), INTEGER()}); + + auto baseData = makeRowVector( + {"name", "age"}, + { + makeFlatVector({"alice", "bob", "charlie", "dave"}), + makeFlatVector({25, 30, 35, 40}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where name is "bob" or "dave". + auto deleteData = makeRowVector( + {"name"}, + { + makeFlatVector({"bob", "dave"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); // field ID 1 = "name" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"name", "age"}, + { + makeFlatVector({"alice", "charlie"}), + makeFlatVector({25, 35}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies equality deletes on a non-first column (field ID 2). +TEST_F(EqualityDeleteFileReaderTest, deleteOnSecondColumn) { + auto rowType = ROW({"id", "category"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "category"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"A", "B", "A", "C", "B"}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where category == "B". + auto deleteData = makeRowVector( + {"category"}, + { + makeFlatVector({"B"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{2}); // field ID 2 = column 1 = "category" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows with category="B" (indices 1,4) deleted. + auto expected = makeRowVector( + {"id", "category"}, + { + makeFlatVector({1, 3, 4}), + makeFlatVector({"A", "A", "C"}), + }); + + assertEqualResults({expected}, {result}); +} + +} // namespace facebook::velox::connector::hive::iceberg From b509734dd7f5ea5dcf9c666df994df0d1249d790 Mon Sep 17 00:00:00 2001 From: Apurva Kumar Date: Fri, 20 Mar 2026 23:31:19 -0700 Subject: [PATCH 4/4] feat:[velox][iceberg] Add sequence number conflict resolution for equality deletes (#16872) Summary: Implements Iceberg V2+ sequence number conflict resolution for equality delete files. Per the Iceberg spec, an equality delete file should only be applied to data files whose data sequence number is strictly less than the delete file's data sequence number. This prevents concurrent write conflicts where a delete file written concurrently with a data file could incorrectly delete rows that were not intended to be deleted. Changes: - Added `dataSequenceNumber` field to `IcebergDeleteFile` struct with default value 0 (unassigned/legacy V1). When 0, sequence number filtering is disabled for backward compatibility. - Added `dataSequenceNumber` field to `HiveIcebergSplit` to carry the base data file's sequence number. - Updated `IcebergSplitReader::prepareSplit()` to skip equality delete files when `deleteFile.dataSequenceNumber <= split.dataSequenceNumber` (unless either is 0, which disables the check). - Updated test constructor of `HiveIcebergSplit` to accept the new `dataSequenceNumber` parameter. Differential Revision: D97530136 --- .../hive/iceberg/IcebergDeleteFile.h | 13 +- .../connectors/hive/iceberg/IcebergSplit.cpp | 6 +- velox/connectors/hive/iceberg/IcebergSplit.h | 10 +- .../hive/iceberg/IcebergSplitReader.cpp | 30 ++ .../tests/EqualityDeleteFileReaderTest.cpp | 283 +++++++++++++++++- 5 files changed, 335 insertions(+), 7 deletions(-) diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h index bc08f5d848ca..ae5822371a60 100644 --- a/velox/connectors/hive/iceberg/IcebergDeleteFile.h +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -53,6 +53,13 @@ struct IcebergDeleteFile { // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> std::unordered_map upperBounds; + /// Data sequence number of this delete file, assigned by the Iceberg snapshot + /// that produced it. Per the Iceberg spec (V2+), an equality delete file must + /// only be applied to data files whose data sequence number is strictly less + /// than the delete file's data sequence number. A value of 0 means + /// "unassigned" (legacy V1 tables) and disables sequence number filtering. + int64_t dataSequenceNumber{0}; + IcebergDeleteFile( FileContent _content, const std::string& _filePath, @@ -61,7 +68,8 @@ struct IcebergDeleteFile { uint64_t _fileSizeInBytes, std::vector _equalityFieldIds = {}, std::unordered_map _lowerBounds = {}, - std::unordered_map _upperBounds = {}) + std::unordered_map _upperBounds = {}, + int64_t _dataSequenceNumber = 0) : content(_content), filePath(_filePath), fileFormat(_fileFormat), @@ -69,7 +77,8 @@ struct IcebergDeleteFile { fileSizeInBytes(_fileSizeInBytes), equalityFieldIds(_equalityFieldIds), lowerBounds(_lowerBounds), - upperBounds(_upperBounds) {} + upperBounds(_upperBounds), + dataSequenceNumber(_dataSequenceNumber) {} }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp index e5cdf63c33b4..8b4417f22a72 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -69,7 +69,8 @@ HiveIcebergSplit::HiveIcebergSplit( bool cacheable, std::vector deletes, const std::unordered_map& infoColumns, - std::optional properties) + std::optional properties, + int64_t dataSequenceNumber) : HiveConnectorSplit( connectorId, filePath, @@ -87,5 +88,6 @@ HiveIcebergSplit::HiveIcebergSplit( properties, std::nullopt, std::nullopt), - deleteFiles(std::move(deletes)) {} + deleteFiles(std::move(deletes)), + dataSequenceNumber(dataSequenceNumber) {} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h index eb2448dabd1c..d161565aa329 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.h +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -25,6 +25,13 @@ namespace facebook::velox::connector::hive::iceberg { struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { std::vector deleteFiles; + /// Data sequence number of the base data file in this split. Per the Iceberg + /// spec (V2+), an equality delete file should only apply to data files whose + /// data sequence number is strictly less than the delete file's sequence + /// number. A value of 0 means "unassigned" (legacy V1 tables) and disables + /// sequence number filtering. + int64_t dataSequenceNumber{0}; + HiveIcebergSplit( const std::string& connectorId, const std::string& filePath, @@ -55,7 +62,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { bool cacheable = true, std::vector deletes = {}, const std::unordered_map& infoColumns = {}, - std::optional fileProperties = std::nullopt); + std::optional fileProperties = std::nullopt, + int64_t dataSequenceNumber = 0); }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index bd727bb419fe..24c3a6b0d7a3 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -29,6 +29,29 @@ using namespace facebook::velox::dwio::common; namespace facebook::velox::connector::hive::iceberg { +namespace { + +/// Returns true if a delete/update file should be skipped based on sequence +/// number conflict resolution. Per the Iceberg spec (V2+): +/// - Equality deletes apply when deleteSeqNum > dataSeqNum (i.e., skip when +/// deleteSeqNum <= dataSeqNum). +/// - Positional deletes, deletion vectors, and positional updates apply when +/// deleteSeqNum >= dataSeqNum (i.e., skip when deleteSeqNum < dataSeqNum), +/// because same-snapshot positional deletes SHOULD apply. +/// - A sequence number of 0 means "unassigned" (legacy V1 tables) and +/// disables filtering (never skip). +bool shouldSkipBySequenceNumber( + int64_t fileSeqNum, + int64_t dataSeqNum, + bool isEqualityDelete) { + if (fileSeqNum <= 0 || dataSeqNum <= 0) { + return false; + } + return isEqualityDelete ? (fileSeqNum <= dataSeqNum) + : (fileSeqNum < dataSeqNum); +} + +} // namespace IcebergSplitReader::IcebergSplitReader( const std::shared_ptr& hiveSplit, @@ -121,6 +144,13 @@ void IcebergSplitReader::prepareSplit( } } else if (deleteFile.content == FileContent::kEqualityDeletes) { if (deleteFile.recordCount > 0 && !deleteFile.equalityFieldIds.empty()) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/true)) { + continue; + } + // Resolve equalityFieldIds to column names and types. In Iceberg, // field IDs for top-level columns are assigned sequentially starting // from 1, matching the column order in the table schema. diff --git a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp index eb360f10e975..601a583e0a6c 100644 --- a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp +++ b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp @@ -81,7 +81,8 @@ class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { /// Creates splits with equality delete files attached. std::vector> makeSplits( const std::string& dataFilePath, - const std::vector& deleteFiles = {}) { + const std::vector& deleteFiles = {}, + int64_t dataSequenceNumber = 0) { auto fileSize = getFileSize(dataFilePath); return {std::make_shared( kIcebergConnectorId, @@ -94,7 +95,10 @@ class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { std::unordered_map{}, nullptr, /*cacheable=*/true, - deleteFiles)}; + deleteFiles, + std::unordered_map{}, + std::nullopt, + dataSequenceNumber)}; } /// Builds a table scan plan node with the given schema. @@ -358,4 +362,279 @@ TEST_F(EqualityDeleteFileReaderTest, deleteOnSecondColumn) { assertEqualResults({expected}, {result}); } +/// Verifies that equality deletes apply when the delete file has a higher +/// sequence number than the data file (per the Iceberg V2+ spec). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberDeleteApplies) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"a", "b", "c", "d", "e"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({2, 4}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 5, data file has sequence number 3. + // Since deleteSeq (5) > dataSeq (3), the delete should apply. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/5); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/3); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows with id=2 and id=4 are deleted. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 3, 5}), + makeFlatVector({"a", "c", "e"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes are skipped when the delete file has a +/// lower or equal sequence number compared to the data file. +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberDeleteSkipped) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 2, data file has sequence number 5. + // Since deleteSeq (2) <= dataSeq (5), the delete should be skipped. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/2); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // All rows survive because the delete file is skipped. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes are skipped when the delete file has the +/// same sequence number as the data file (edge case of the <= check). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberEqualSkipped) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file and data file have the same sequence number (5). + // Since deleteSeq (5) <= dataSeq (5), the delete should be skipped. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/5); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // All rows survive because the delete file is skipped (equal seq#). + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when either sequence number is 0 (unassigned/legacy V1), +/// the delete file is always applied (filtering is disabled). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberZeroAlwaysApplies) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({2}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 0 (legacy), data file has sequence 10. + // Since deleteSeq is 0, filtering is disabled and the delete applies. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/0); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/10); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Row id=2 is deleted because sequence number filtering is disabled. + auto expected = makeRowVector( + {"id"}, + { + makeFlatVector({1, 3}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when multiple delete files have different sequence numbers, +/// only those with higher sequence numbers than the data file are applied. +TEST_F(EqualityDeleteFileReaderTest, mixedSequenceNumbers) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"a", "b", "c", "d", "e"}), + }); + auto dataFile = writeDataFile({baseData}); + + // First delete file: seqNum=10 (higher than data seqNum=5) → applied. + auto deleteData1 = makeRowVector( + {"id"}, + { + makeFlatVector({2}), + }); + auto eqDeleteFile1 = writeEqDeleteFile({deleteData1}); + IcebergDeleteFile icebergDeleteFile1( + FileContent::kEqualityDeletes, + eqDeleteFile1->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile1->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/10); + + // Second delete file: seqNum=3 (lower than data seqNum=5) → skipped. + auto deleteData2 = makeRowVector( + {"id"}, + { + makeFlatVector({4}), + }); + auto eqDeleteFile2 = writeEqDeleteFile({deleteData2}); + IcebergDeleteFile icebergDeleteFile2( + FileContent::kEqualityDeletes, + eqDeleteFile2->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile2->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/3); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile1, icebergDeleteFile2}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Only id=2 is deleted (from delete file 1 with seqNum=10). + // id=4 survives because delete file 2 (seqNum=3) is skipped. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 3, 4, 5}), + makeFlatVector({"a", "c", "d", "e"}), + }); + + assertEqualResults({expected}, {result}); +} + +// TODO: Add a Parquet-format equality delete test. Currently all equality +// delete tests use DWRF because writeToFile() (from HiveConnectorTestBase) +// only supports DWRF. Adding a Parquet test requires adding Parquet writer +// dependencies to this test target's BUCK file and a Parquet write helper. + } // namespace facebook::velox::connector::hive::iceberg