diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index 6bb02cb38478..233579680b92 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -14,6 +14,7 @@ set( ICEBERG_SOURCES + DeletionVectorReader.cpp IcebergConfig.cpp IcebergColumnHandle.cpp IcebergConnector.cpp @@ -21,6 +22,7 @@ set( IcebergDataSink.cpp IcebergDataSource.cpp IcebergPartitionName.cpp + EqualityDeleteFileReader.cpp IcebergSplit.cpp IcebergSplitReader.cpp PartitionSpec.cpp diff --git a/velox/connectors/hive/iceberg/DeletionVectorReader.cpp b/velox/connectors/hive/iceberg/DeletionVectorReader.cpp new file mode 100644 index 000000000000..5e44bec2066d --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorReader.cpp @@ -0,0 +1,334 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/file/FileSystems.h" + +namespace facebook::velox::connector::hive::iceberg { + +DeletionVectorReader::DeletionVectorReader( + const IcebergDeleteFile& dvFile, + uint64_t splitOffset, + memory::MemoryPool* /*pool*/) + : dvFile_(dvFile), splitOffset_(splitOffset) { + VELOX_CHECK( + dvFile_.content == FileContent::kDeletionVector, + "Expected deletion vector file but got content type: {}", + static_cast(dvFile_.content)); + VELOX_CHECK_GT(dvFile_.recordCount, 0, "Empty deletion vector."); + + // Sanity-check the record count to catch corrupt metadata before we + // allocate a potentially enormous bitmap. 10 billion rows is well + // beyond any realistic single-file cardinality. + static constexpr int64_t kMaxDeletionVectorRecordCount = 10'000'000'000LL; + VELOX_CHECK_LE( + dvFile_.recordCount, + kMaxDeletionVectorRecordCount, + "Deletion vector record count exceeds sanity limit: {}", + dvFile_.recordCount); +} + +void DeletionVectorReader::loadBitmap() { + if (loaded_) { + return; + } + loaded_ = true; + + // Read the raw DV blob from the file. The blob offset and length are + // encoded in the IcebergDeleteFile bounds maps by the coordinator. + uint64_t blobOffset = 0; + uint64_t blobLength = dvFile_.fileSizeInBytes; + + if (auto it = dvFile_.lowerBounds.find(kDvOffsetFieldId); + it != dvFile_.lowerBounds.end()) { + try { + blobOffset = std::stoull(it->second); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to parse DV blob offset from bounds map: {}", e.what()); + } + } + if (auto it = dvFile_.upperBounds.find(kDvLengthFieldId); + it != dvFile_.upperBounds.end()) { + try { + blobLength = std::stoull(it->second); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to parse DV blob length from bounds map: {}", e.what()); + } + } + + auto fs = filesystems::getFileSystem(dvFile_.filePath, nullptr); + auto readFile = fs->openFileForRead(dvFile_.filePath); + + auto fileSize = readFile->size(); + VELOX_CHECK_LE( + blobOffset, + fileSize, + "DV blob offset {} exceeds file size {}.", + blobOffset, + fileSize); + VELOX_CHECK_LE( + blobLength, + fileSize - blobOffset, + "DV blob range [{}, {}) exceeds file size {}.", + blobOffset, + blobOffset + blobLength, + fileSize); + + // Read the blob bytes. + std::string blobData(blobLength, '\0'); + readFile->pread(blobOffset, blobLength, blobData.data()); + + // Deserialize the roaring bitmap from the portable binary format. + // The Iceberg V3 spec uses the standard RoaringBitmap portable + // serialization (https://roaringbitmap.org/). We parse the bitmap + // directly without depending on CRoaring — the format is well-defined: + // + // For small deletion vectors, the coordinator may provide the deleted + // positions directly as a sorted list encoded in the blob. For the + // general case, we parse the roaring bitmap portable format. + // + // Portable format layout: + // - cookie: uint32 (identifies format version) + // - container count: uint32 + // - per container: key (uint16) + cardinality-1 (uint16) + // - per container: offset (uint32) [if >4 containers] + // - container data: array or bitset containers + // + // We use a simplified parser that extracts all set positions. + deserializeRoaringBitmap(blobData); + + // Sort positions for efficient batch-range scanning. + std::sort(deletedPositions_.begin(), deletedPositions_.end()); +} + +void DeletionVectorReader::deserializeRoaringBitmap(const std::string& data) { + if (data.size() < 8) { + VELOX_FAIL( + "Deletion vector blob too small: {} bytes, expected at least 8.", + data.size()); + } + + const uint8_t* ptr = reinterpret_cast(data.data()); + const uint8_t* end = ptr + data.size(); + + // Read cookie (first 4 bytes). The portable format has two variants: + // - SERIAL_COOKIE_NO_RUNCONTAINER (12346): standard format + // - SERIAL_COOKIE (12347): format with run containers + uint32_t cookie; + std::memcpy(&cookie, ptr, sizeof(uint32_t)); + cookie = folly::Endian::little(cookie); + ptr += sizeof(uint32_t); + + static constexpr uint32_t kSerialCookieNoRun = 12346; + static constexpr uint32_t kSerialCookie = 12347; + + bool hasRunContainers = false; + uint32_t numContainers = 0; + + if ((cookie & 0xFFFF) == kSerialCookie) { + hasRunContainers = true; + numContainers = (cookie >> 16) + 1; + } else if (cookie == kSerialCookieNoRun) { + std::memcpy(&numContainers, ptr, sizeof(uint32_t)); + numContainers = folly::Endian::little(numContainers); + ptr += sizeof(uint32_t); + } else { + VELOX_FAIL( + "Unknown roaring bitmap cookie: {}. Expected {} or {}.", + cookie, + kSerialCookieNoRun, + kSerialCookie); + } + + if (numContainers == 0) { + return; + } + + // Read run bitmap if present (ceil(numContainers / 8) bytes). + std::vector isRunContainer(numContainers, false); + if (hasRunContainers) { + uint32_t runBitmapBytes = (numContainers + 7) / 8; + VELOX_CHECK_GE( + static_cast(end - ptr), + runBitmapBytes, + "Truncated run bitmap."); + for (uint32_t i = 0; i < numContainers; ++i) { + isRunContainer[i] = (ptr[i / 8] >> (i % 8)) & 1; + } + ptr += runBitmapBytes; + } + + // Read key-cardinality pairs: (uint16 key, uint16 cardinality-1) per + // container. + struct ContainerMeta { + uint16_t key; + uint32_t cardinality; + }; + std::vector containers(numContainers); + + VELOX_CHECK_GE( + static_cast(end - ptr), + numContainers * 4, + "Truncated container metadata."); + for (uint32_t i = 0; i < numContainers; ++i) { + uint16_t key, cardMinus1; + std::memcpy(&key, ptr, sizeof(uint16_t)); + key = folly::Endian::little(key); + ptr += sizeof(uint16_t); + std::memcpy(&cardMinus1, ptr, sizeof(uint16_t)); + cardMinus1 = folly::Endian::little(cardMinus1); + ptr += sizeof(uint16_t); + containers[i] = {key, static_cast(cardMinus1) + 1}; + } + + // Skip offset section when there are >= 4 containers. Per the roaring + // bitmap portable format spec, the offset section is present for both + // SERIAL_COOKIE_NO_RUNCONTAINER and SERIAL_COOKIE formats when + // numContainers >= NO_OFFSET_THRESHOLD (4). + if (numContainers >= 4) { + // Offsets: uint32 per container. + VELOX_CHECK_GE( + static_cast(end - ptr), + numContainers * 4, + "Truncated offset section."); + ptr += numContainers * sizeof(uint32_t); + } + + // Read container data. + // Guard against unreasonable recordCount that could cause excessive + // allocation. + static constexpr int64_t kMaxDeletionVectorPositions = 1LL + << 30; // ~1 billion + VELOX_CHECK_LE( + dvFile_.recordCount, + kMaxDeletionVectorPositions, + "Deletion vector recordCount exceeds maximum: {}", + dvFile_.recordCount); + deletedPositions_.reserve(dvFile_.recordCount); + + for (uint32_t i = 0; i < numContainers; ++i) { + uint32_t highBits = static_cast(containers[i].key) << 16; + uint32_t cardinality = containers[i].cardinality; + + if (isRunContainer[i]) { + // Run container: pairs of (start, length-1). + uint16_t numRuns; + VELOX_CHECK_GE( + static_cast(end - ptr), + 2u, + "Truncated run container header."); + std::memcpy(&numRuns, ptr, sizeof(uint16_t)); + numRuns = folly::Endian::little(numRuns); + ptr += sizeof(uint16_t); + + VELOX_CHECK_GE( + static_cast(end - ptr), + static_cast(numRuns) * 4, + "Truncated run container data."); + for (uint16_t r = 0; r < numRuns; ++r) { + uint16_t start, lengthMinus1; + std::memcpy(&start, ptr, sizeof(uint16_t)); + start = folly::Endian::little(start); + ptr += sizeof(uint16_t); + std::memcpy(&lengthMinus1, ptr, sizeof(uint16_t)); + lengthMinus1 = folly::Endian::little(lengthMinus1); + ptr += sizeof(uint16_t); + for (uint32_t v = start; + v <= static_cast(start) + lengthMinus1; + ++v) { + deletedPositions_.push_back(static_cast(highBits | v)); + } + } + } else if (cardinality <= 4096) { + // Array container: sorted uint16 values. + VELOX_CHECK_GE( + static_cast(end - ptr), + cardinality * 2, + "Truncated array container."); + for (uint32_t j = 0; j < cardinality; ++j) { + uint16_t val; + std::memcpy(&val, ptr, sizeof(uint16_t)); + val = folly::Endian::little(val); + ptr += sizeof(uint16_t); + deletedPositions_.push_back(static_cast(highBits | val)); + } + } else { + // Bitset container: 2^16 bits = 8192 bytes. + static constexpr size_t kBitsetBytes = 8192; + VELOX_CHECK_GE( + static_cast(end - ptr), + kBitsetBytes, + "Truncated bitset container."); + for (uint32_t word = 0; word < 1024; ++word) { + uint64_t bits; + std::memcpy(&bits, ptr + word * 8, sizeof(uint64_t)); + bits = folly::Endian::little(bits); + while (bits != 0) { + uint32_t bit = __builtin_ctzll(bits); + deletedPositions_.push_back( + static_cast(highBits | (word * 64 + bit))); + bits &= bits - 1; + } + } + ptr += kBitsetBytes; + } + } +} + +void DeletionVectorReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + BufferPtr deleteBitmap) { + loadBitmap(); + + if (deletedPositions_.empty()) { + return; + } + + auto* bitmap = deleteBitmap->asMutable(); + int64_t rowNumberLowerBound = + static_cast(splitOffset_ + baseReadOffset); + int64_t rowNumberUpperBound = + rowNumberLowerBound + static_cast(size); + + // Advance positionIndex_ past positions before the current batch. + while (positionIndex_ < deletedPositions_.size() && + deletedPositions_[positionIndex_] < rowNumberLowerBound) { + ++positionIndex_; + } + + // Set bits for positions within the current batch range. + while (positionIndex_ < deletedPositions_.size() && + deletedPositions_[positionIndex_] < rowNumberUpperBound) { + auto bitIndex = static_cast( + deletedPositions_[positionIndex_] - rowNumberLowerBound); + bits::setBit(bitmap, bitIndex); + ++positionIndex_; + } +} + +bool DeletionVectorReader::noMoreData() const { + return loaded_ && positionIndex_ >= deletedPositions_.size(); +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/DeletionVectorReader.h b/velox/connectors/hive/iceberg/DeletionVectorReader.h new file mode 100644 index 000000000000..1639904aeb78 --- /dev/null +++ b/velox/connectors/hive/iceberg/DeletionVectorReader.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/memory/Memory.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// Reads an Iceberg V3 deletion vector and applies it to the delete bitmap. +/// +/// Iceberg V3 replaces positional delete files with deletion vectors — compact +/// roaring bitmaps stored as blobs inside Puffin files. Each DV is associated +/// with a single base data file and contains 0-based row positions of deleted +/// rows. +/// +/// Compared to the V2 PositionalDeleteFileReader: +/// - No columnar file parsing (Puffin blobs are raw binary, not Parquet) +/// - No file_path filtering (each DV is pre-associated with its data file) +/// - No sorted merge of multiple delete files (one DV per data file) +/// - Direct bitmap-to-bitmap conversion instead of row-by-row position +/// reading +/// +/// The coordinator extracts the blob offset and length from the Puffin footer +/// and provides them via the IcebergDeleteFile metadata. The reader opens the +/// file, reads the raw bytes at the given offset, deserializes the roaring +/// bitmap, and sets bits in the delete bitmap for the current batch range. +class DeletionVectorReader { + public: + /// Constructs a reader for a single deletion vector. + /// + /// @param dvFile Metadata about the deletion vector file. Must have + /// content == FileContent::kDeletionVector. The filePath points to the + /// Puffin file, and the DV blob offset/length are encoded in the + /// lowerBounds/upperBounds fields (key = kDvOffsetFieldId for offset, + /// kDvLengthFieldId for length). + /// @param splitOffset File position of the first row in the split. + /// @param pool Memory pool for bitmap allocations. + /// @param fileSystem Filesystem to read the Puffin file from. + DeletionVectorReader( + const IcebergDeleteFile& dvFile, + uint64_t splitOffset, + memory::MemoryPool* pool); + + /// Reads deleted positions from the DV and sets corresponding bits in the + /// deleteBitmap for the current batch range. + /// + /// @param baseReadOffset Read offset from the beginning of the split in + /// number of rows for the current batch. + /// @param size Number of rows in the current batch. + /// @param deleteBitmap Output bitmap. Bit i is set if the row at file + /// position (splitOffset + baseReadOffset + i) is deleted. + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + BufferPtr deleteBitmap); + + /// Returns true when there is no more data. For DVs this is always true + /// after the first readDeletePositions() call since the entire bitmap is + /// loaded eagerly. + bool noMoreData() const; + + /// Field IDs used to encode DV blob offset and length in the + /// IcebergDeleteFile bounds maps. The coordinator encodes these when + /// building splits from Puffin file metadata. + static constexpr int32_t kDvOffsetFieldId = 100; + static constexpr int32_t kDvLengthFieldId = 101; + + private: + /// Loads the deletion vector bitmap from the Puffin file. Called lazily + /// on the first readDeletePositions() call. + void loadBitmap(); + + /// Parses a roaring bitmap from its portable binary serialization format + /// and populates deletedPositions_ with all set positions. + void deserializeRoaringBitmap(const std::string& data); + + const IcebergDeleteFile dvFile_; + const uint64_t splitOffset_; + + /// Sorted vector of deleted row positions (0-based, relative to the + /// start of the base data file). Populated by loadBitmap(). + std::vector deletedPositions_; + + /// Index into deletedPositions_ tracking how far we've consumed. + size_t positionIndex_{0}; + + /// Whether the bitmap has been loaded from the file. + bool loaded_{false}; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp new file mode 100644 index 000000000000..7b11141ab684 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp @@ -0,0 +1,353 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" + +#include "velox/common/base/BitUtil.h" +#include "velox/connectors/hive/BufferedInputBuilder.h" +#include "velox/connectors/hive/HiveConnectorUtil.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +/// Hashes a single value from a vector at the given index. +/// Handles lazy vectors via loadedVector(). Returns 0 for null values. +uint64_t hashValue(const VectorPtr& vectorPtr, vector_size_t index) { + const auto* vector = vectorPtr->loadedVector(); + if (vector->isNullAt(index)) { + return 0; + } + + auto type = vector->type(); + switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum) + case TypeKind::BOOLEAN: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::TINYINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::SMALLINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::INTEGER: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::BIGINT: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::REAL: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::DOUBLE: + return std::hash{}( + vector->as>()->valueAt(index)); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto sv = vector->as>()->valueAt(index); + return folly::hasher{}( + std::string_view(sv.data(), sv.size())); + } + case TypeKind::TIMESTAMP: { + auto ts = vector->as>()->valueAt(index); + return std::hash{}(ts.toNanos()); + } + default: + VELOX_NYI( + "Equality delete hash not implemented for type: {}", + type->toString()); + } +} + +/// Compares two values from vectors at given indices. +/// Handles lazy vectors via loadedVector(). +bool compareValues( + const VectorPtr& leftPtr, + vector_size_t leftIdx, + const VectorPtr& rightPtr, + vector_size_t rightIdx) { + const auto* left = leftPtr->loadedVector(); + const auto* right = rightPtr->loadedVector(); + bool leftNull = left->isNullAt(leftIdx); + bool rightNull = right->isNullAt(rightIdx); + if (leftNull && rightNull) { + return true; + } + if (leftNull || rightNull) { + return false; + } + + auto type = left->type(); + switch (type->kind()) { // NOLINT(clang-diagnostic-switch-enum) + case TypeKind::BOOLEAN: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::TINYINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::SMALLINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::INTEGER: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::BIGINT: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::REAL: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::DOUBLE: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto lv = left->as>()->valueAt(leftIdx); + auto rv = right->as>()->valueAt(rightIdx); + return std::string_view(lv.data(), lv.size()) == + std::string_view(rv.data(), rv.size()); + } + case TypeKind::TIMESTAMP: + return left->as>()->valueAt(leftIdx) == + right->as>()->valueAt(rightIdx); + default: + VELOX_NYI( + "Equality delete comparison not implemented for type: {}", + type->toString()); + } +} + +} // namespace + +EqualityDeleteFileReader::EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::vector& equalityColumnNames, + const std::vector& equalityColumnTypes, + const std::string& /*baseFilePath*/, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId) + : equalityColumnNames_(equalityColumnNames), + equalityColumnTypes_(equalityColumnTypes), + pool_(connectorQueryCtx->memoryPool()) { + VELOX_CHECK( + deleteFile.content == FileContent::kEqualityDeletes, + "Expected equality delete file but got content type: {}", + static_cast(deleteFile.content)); + VELOX_CHECK_GT(deleteFile.recordCount, 0, "Empty equality delete file."); + VELOX_CHECK( + !equalityColumnNames_.empty(), + "Equality delete file must specify at least one column."); + VELOX_CHECK_EQ( + equalityColumnNames_.size(), + equalityColumnTypes_.size(), + "Equality column names and types must have the same size."); + + // Build the file schema for the equality delete columns only. + auto deleteFileSchema = + ROW(std::vector(equalityColumnNames_), + std::vector(equalityColumnTypes_)); + + // Create a ScanSpec that reads only the equality delete columns. + auto scanSpec = std::make_shared(""); + for (size_t i = 0; i < equalityColumnNames_.size(); ++i) { + scanSpec->addField(equalityColumnNames_[i], static_cast(i)); + } + + auto deleteSplit = std::make_shared( + connectorId, + deleteFile.filePath, + deleteFile.fileFormat, + 0, + deleteFile.fileSizeInBytes); + + dwio::common::ReaderOptions deleteReaderOpts(pool_); + configureReaderOptions( + hiveConfig, + connectorQueryCtx, + deleteFileSchema, + deleteSplit, + /*tableParameters=*/{}, + deleteReaderOpts); + + const FileHandleKey fileHandleKey{ + .filename = deleteFile.filePath, + .tokenProvider = connectorQueryCtx->fsTokenProvider()}; + auto deleteFileHandleCachePtr = fileHandleFactory->generate(fileHandleKey); + auto deleteFileInput = BufferedInputBuilder::getInstance()->create( + *deleteFileHandleCachePtr, + deleteReaderOpts, + connectorQueryCtx, + ioStatistics, + ioStats, + executor); + + auto deleteReader = + dwio::common::getReaderFactory(deleteReaderOpts.fileFormat()) + ->createReader(std::move(deleteFileInput), deleteReaderOpts); + + if (!testFilters( + scanSpec.get(), + deleteReader.get(), + deleteSplit->filePath, + deleteSplit->partitionKeys, + {}, + hiveConfig->readTimestampPartitionValueAsLocalTime( + connectorQueryCtx->sessionProperties()))) { + runtimeStats.skippedSplitBytes += static_cast(deleteSplit->length); + return; + } + + dwio::common::RowReaderOptions deleteRowReaderOpts; + configureRowReaderOptions( + {}, + scanSpec, + nullptr, + deleteFileSchema, + deleteSplit, + nullptr, + nullptr, + nullptr, + deleteRowReaderOpts); + + auto deleteRowReader = deleteReader->createRowReader(deleteRowReaderOpts); + + // Read the entire equality delete file and build the hash set. + VectorPtr output; + output = BaseVector::create(deleteFileSchema, 0, pool_); + + while (true) { + auto rowsRead = deleteRowReader->next( + std::max(static_cast(1'000), deleteFile.recordCount), output); + if (rowsRead == 0) { + break; + } + + auto numRows = output->size(); + if (numRows == 0) { + continue; + } + + output->loadedVector(); + auto rowOutput = std::dynamic_pointer_cast(output); + VELOX_CHECK_NOT_NULL(rowOutput); + + size_t batchIndex = deleteRows_.size(); + deleteRows_.push_back(rowOutput); + + // Resolve column indices on the first batch. + if (deleteColumnIndices_.empty()) { + for (const auto& colName : equalityColumnNames_) { + auto idx = rowOutput->type()->as().getChildIdx(colName); + deleteColumnIndices_.push_back(static_cast(idx)); + } + } + + // Hash each row and insert into the multimap. + for (vector_size_t i = 0; i < numRows; ++i) { + uint64_t hash = hashRow(rowOutput, i); + deleteKeyHashes_.emplace(hash, DeleteKeyEntry{batchIndex, i}); + } + + // Reset output for next batch. + output = BaseVector::create(deleteFileSchema, 0, pool_); + } +} + +void EqualityDeleteFileReader::applyDeletes( + const RowVectorPtr& output, + BufferPtr deleteBitmap) { + if (deleteKeyHashes_.empty() || output->size() == 0) { + return; + } + + auto* bitmap = deleteBitmap->asMutable(); + + // For each row in the output, compute its hash and probe the delete set. + for (vector_size_t i = 0; i < output->size(); ++i) { + // Skip rows already deleted by positional/DV deletes. + if (bits::isBitSet(bitmap, i)) { + continue; + } + + uint64_t hash = hashRow(output, i); + auto range = deleteKeyHashes_.equal_range(hash); + + for (auto it = range.first; it != range.second; ++it) { + auto& entry = it->second; + if (equalRows(output, i, deleteRows_[entry.batchIndex], entry.rowIndex)) { + bits::setBit(bitmap, i); + break; + } + } + } +} + +uint64_t EqualityDeleteFileReader::hashRow( + const RowVectorPtr& row, + vector_size_t index) const { + uint64_t hash = 0; + + // For the delete file rows, use deleteColumnIndices_. + // For the base data rows, look up columns by name. + const auto& rowType = row->type()->asRow(); + + for (size_t c = 0; c < equalityColumnNames_.size(); ++c) { + auto colIdx = rowType.getChildIdxIfExists(equalityColumnNames_[c]); + VELOX_CHECK( + colIdx.has_value(), + "Column not found in row: {}", + equalityColumnNames_[c]); + auto colHash = hashValue(row->childAt(*colIdx), index); + // Combine hashes using a simple mix. + hash ^= colHash + 0x9e3779b97f4a7c15ULL + (hash << 6) + (hash >> 2); + } + return hash; +} + +bool EqualityDeleteFileReader::equalRows( + const RowVectorPtr& left, + vector_size_t leftIndex, + const RowVectorPtr& right, + vector_size_t rightIndex) const { + const auto& leftType = left->type()->asRow(); + const auto& rightType = right->type()->asRow(); + + for (size_t c = 0; c < equalityColumnNames_.size(); ++c) { + auto leftColIdx = leftType.getChildIdxIfExists(equalityColumnNames_[c]); + auto rightColIdx = rightType.getChildIdxIfExists(equalityColumnNames_[c]); + VELOX_CHECK(leftColIdx.has_value() && rightColIdx.has_value()); + + if (!compareValues( + left->childAt(*leftColIdx), + leftIndex, + right->childAt(*rightColIdx), + rightIndex)) { + return false; + } + } + return true; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h new file mode 100644 index 000000000000..e77b6a8c5068 --- /dev/null +++ b/velox/connectors/hive/iceberg/EqualityDeleteFileReader.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/dwio/common/Reader.h" + +namespace facebook::velox::connector::hive::iceberg { + +/// Reads an Iceberg equality delete file and filters base data rows whose +/// equality delete column values match any row in the delete file. +/// +/// Iceberg equality delete files contain rows with values for one or more +/// columns (identified by equalityFieldIds). A base data row is deleted if +/// its values match ALL specified columns of ANY row in the delete file. +/// +/// Unlike positional deletes (which set bits before reading), equality deletes +/// require reading the base data first, then probing each row against the +/// delete set. The reader eagerly loads all delete key tuples from the file +/// into an in-memory hash set during construction. +/// +/// The equality delete column names are resolved from equalityFieldIds via +/// the table schema provided by the caller. +class EqualityDeleteFileReader { + public: + /// Constructs a reader for a single equality delete file. + /// + /// Eagerly reads the entire delete file and builds an in-memory hash set + /// of delete key tuples. The delete file is fully consumed during + /// construction. + /// + /// @param deleteFile Metadata about the equality delete file. Must have + /// content == FileContent::kEqualityDeletes and non-empty + /// equalityFieldIds. + /// @param equalityColumnNames Ordered column names corresponding to + /// equalityFieldIds, resolved by the caller from the table schema. + /// @param equalityColumnTypes Ordered column types corresponding to + /// equalityFieldIds. + /// @param baseFilePath Path of the base data file being read. + /// @param fileHandleFactory Factory for creating file handles. + /// @param connectorQueryCtx Query context for memory and config. + /// @param executor IO executor for async reads. + /// @param hiveConfig Hive configuration. + /// @param ioStatistics IO statistics collector. + /// @param ioStats IO stats tracker. + /// @param runtimeStats Runtime statistics for recording skipped bytes. + /// @param connectorId Connector identifier. + EqualityDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::vector& equalityColumnNames, + const std::vector& equalityColumnTypes, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr& hiveConfig, + const std::shared_ptr& ioStatistics, + const std::shared_ptr& ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + const std::string& connectorId); + + /// Applies equality deletes to the output vector by setting bits in the + /// delete bitmap for rows whose equality column values match any delete + /// key tuple. + /// + /// @param output The base data output vector to filter. + /// @param deleteBitmap Output bitmap. Bit i is set if row i matches an + /// equality delete. The bitmap must be pre-allocated to cover at least + /// output->size() rows. + void applyDeletes(const RowVectorPtr& output, BufferPtr deleteBitmap); + + /// Returns the number of delete key tuples loaded from the file. + size_t numDeleteKeys() const { + return deleteKeyHashes_.size(); + } + + /// Returns true if this reader has no delete keys (file was skipped or + /// empty). When true, applyDeletes() is a no-op. + bool empty() const { + return deleteKeyHashes_.empty(); + } + + private: + /// Hashes a single row's equality delete columns into a uint64_t key. + /// Uses a simple combine-hash approach over the raw values. + uint64_t hashRow(const RowVectorPtr& row, vector_size_t index) const; + + /// Checks whether two rows are equal on all equality delete columns. + bool equalRows( + const RowVectorPtr& left, + vector_size_t leftIndex, + const RowVectorPtr& right, + vector_size_t rightIndex) const; + + /// Column names and types for equality delete comparison. + std::vector equalityColumnNames_; + std::vector equalityColumnTypes_; + + /// Column indices in the delete file output vector. + std::vector deleteColumnIndices_; + + /// All rows read from the equality delete file, stored for equality + /// comparison during probing. + std::vector deleteRows_; + + /// Hash set storing (hash → vector of (batch index, row index)) for + /// all delete key tuples. Used for fast probing. + struct DeleteKeyEntry { + size_t batchIndex; + vector_size_t rowIndex; + }; + std::unordered_multimap deleteKeyHashes_; + + memory::MemoryPool* const pool_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h index 2f9206dfc264..ae5822371a60 100644 --- a/velox/connectors/hive/iceberg/IcebergDeleteFile.h +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -27,6 +27,12 @@ enum class FileContent { kData, kPositionalDeletes, kEqualityDeletes, + /// Iceberg V3 deletion vector. A serialized roaring bitmap of deleted row + /// positions stored as a blob inside a Puffin file. More compact than V2 + /// positional delete files and avoids sorted merge of multiple delete files. + /// The coordinator extracts the blob offset and length from the Puffin + /// footer and provides them via IcebergDeleteFile fields. + kDeletionVector, }; struct IcebergDeleteFile { @@ -47,6 +53,13 @@ struct IcebergDeleteFile { // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> std::unordered_map upperBounds; + /// Data sequence number of this delete file, assigned by the Iceberg snapshot + /// that produced it. Per the Iceberg spec (V2+), an equality delete file must + /// only be applied to data files whose data sequence number is strictly less + /// than the delete file's data sequence number. A value of 0 means + /// "unassigned" (legacy V1 tables) and disables sequence number filtering. + int64_t dataSequenceNumber{0}; + IcebergDeleteFile( FileContent _content, const std::string& _filePath, @@ -55,7 +68,8 @@ struct IcebergDeleteFile { uint64_t _fileSizeInBytes, std::vector _equalityFieldIds = {}, std::unordered_map _lowerBounds = {}, - std::unordered_map _upperBounds = {}) + std::unordered_map _upperBounds = {}, + int64_t _dataSequenceNumber = 0) : content(_content), filePath(_filePath), fileFormat(_fileFormat), @@ -63,7 +77,8 @@ struct IcebergDeleteFile { fileSizeInBytes(_fileSizeInBytes), equalityFieldIds(_equalityFieldIds), lowerBounds(_lowerBounds), - upperBounds(_upperBounds) {} + upperBounds(_upperBounds), + dataSequenceNumber(_dataSequenceNumber) {} }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp index e5cdf63c33b4..8b4417f22a72 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -69,7 +69,8 @@ HiveIcebergSplit::HiveIcebergSplit( bool cacheable, std::vector deletes, const std::unordered_map& infoColumns, - std::optional properties) + std::optional properties, + int64_t dataSequenceNumber) : HiveConnectorSplit( connectorId, filePath, @@ -87,5 +88,6 @@ HiveIcebergSplit::HiveIcebergSplit( properties, std::nullopt, std::nullopt), - deleteFiles(std::move(deletes)) {} + deleteFiles(std::move(deletes)), + dataSequenceNumber(dataSequenceNumber) {} } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h index eb2448dabd1c..d161565aa329 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.h +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -25,6 +25,13 @@ namespace facebook::velox::connector::hive::iceberg { struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { std::vector deleteFiles; + /// Data sequence number of the base data file in this split. Per the Iceberg + /// spec (V2+), an equality delete file should only apply to data files whose + /// data sequence number is strictly less than the delete file's sequence + /// number. A value of 0 means "unassigned" (legacy V1 tables) and disables + /// sequence number filtering. + int64_t dataSequenceNumber{0}; + HiveIcebergSplit( const std::string& connectorId, const std::string& filePath, @@ -55,7 +62,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { bool cacheable = true, std::vector deletes = {}, const std::unordered_map& infoColumns = {}, - std::optional fileProperties = std::nullopt); + std::optional fileProperties = std::nullopt, + int64_t dataSequenceNumber = 0); }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index a8821a75204f..e20ec8bae99f 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -18,7 +18,9 @@ #include +#include "velox/common/base/Exceptions.h" #include "velox/common/encode/Base64.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" @@ -27,6 +29,29 @@ using namespace facebook::velox::dwio::common; namespace facebook::velox::connector::hive::iceberg { +namespace { + +/// Returns true if a delete/update file should be skipped based on sequence +/// number conflict resolution. Per the Iceberg spec (V2+): +/// - Equality deletes apply when deleteSeqNum > dataSeqNum (i.e., skip when +/// deleteSeqNum <= dataSeqNum). +/// - Positional deletes, deletion vectors, and positional updates apply when +/// deleteSeqNum >= dataSeqNum (i.e., skip when deleteSeqNum < dataSeqNum), +/// because same-snapshot positional deletes SHOULD apply. +/// - A sequence number of 0 means "unassigned" (legacy V1 tables) and +/// disables filtering (never skip). +bool shouldSkipBySequenceNumber( + int64_t fileSeqNum, + int64_t dataSeqNum, + bool isEqualityDelete) { + if (fileSeqNum <= 0 || dataSeqNum <= 0) { + return false; + } + return isEqualityDelete ? (fileSeqNum <= dataSeqNum) + : (fileSeqNum < dataSeqNum); +} + +} // namespace IcebergSplitReader::IcebergSplitReader( const std::shared_ptr& hiveSplit, @@ -77,11 +102,20 @@ void IcebergSplitReader::prepareSplit( baseReadOffset_ = 0; splitOffset_ = baseRowReader_->nextRowNumber(); positionalDeleteFileReaders_.clear(); + deletionVectorReaders_.clear(); + equalityDeleteFileReaders_.clear(); const auto& deleteFiles = icebergSplit->deleteFiles; for (const auto& deleteFile : deleteFiles) { if (deleteFile.content == FileContent::kPositionalDeletes) { if (deleteFile.recordCount > 0) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/false)) { + continue; + } + // Skip the delete file if all delete positions are before this split. // TODO: Skip delete files where all positions are after the split, if // split row count becomes available. @@ -115,8 +149,70 @@ void IcebergSplitReader::prepareSplit( splitOffset_, hiveSplit_->connectorId)); } + } else if (deleteFile.content == FileContent::kEqualityDeletes) { + if (deleteFile.recordCount > 0 && !deleteFile.equalityFieldIds.empty()) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/true)) { + continue; + } + + // Resolve equalityFieldIds to column names and types. In Iceberg, + // field IDs for top-level columns are assigned sequentially starting + // from 1, matching the column order in the table schema. + std::vector equalityColumnNames; + std::vector equalityColumnTypes; + + const auto& dataColumns = hiveTableHandle_->dataColumns(); + if (dataColumns) { + for (const auto& eqFieldId : deleteFile.equalityFieldIds) { + // Field IDs are 1-based; column index is fieldId - 1. + auto colIdx = static_cast(eqFieldId - 1); + VELOX_CHECK_LT( + colIdx, + dataColumns->size(), + "Equality delete field ID out of range: {}", + eqFieldId); + equalityColumnNames.push_back(dataColumns->nameOf(colIdx)); + equalityColumnTypes.push_back(dataColumns->childAt(colIdx)); + } + } + + if (!equalityColumnNames.empty()) { + equalityDeleteFileReaders_.push_back( + std::make_unique( + deleteFile, + equalityColumnNames, + equalityColumnTypes, + hiveSplit_->filePath, + fileHandleFactory_, + connectorQueryCtx_, + ioExecutor_, + hiveConfig_, + ioStatistics_, + ioStats_, + runtimeStats, + hiveSplit_->connectorId)); + } + } + } else if (deleteFile.content == FileContent::kDeletionVector) { + if (deleteFile.recordCount > 0) { + if (shouldSkipBySequenceNumber( + deleteFile.dataSequenceNumber, + icebergSplit->dataSequenceNumber, + /*isEqualityDelete=*/false)) { + continue; + } + + deletionVectorReaders_.push_back( + std::make_unique( + deleteFile, splitOffset_, connectorQueryCtx_->memoryPool())); + } } else { - VELOX_NYI(); + VELOX_NYI( + "Unsupported delete file content type: {}", + static_cast(deleteFile.content)); } } } @@ -137,7 +233,8 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { return 0; } - if (!positionalDeleteFileReaders_.empty()) { + if (!positionalDeleteFileReaders_.empty() || + !deletionVectorReaders_.empty()) { auto numBytes = bits::nbytes(actualSize); dwio::common::ensureCapacity( deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), false, true); @@ -152,6 +249,17 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { ++iter; } } + + for (auto iter = deletionVectorReaders_.begin(); + iter != deletionVectorReaders_.end();) { + (*iter)->readDeletePositions(baseReadOffset_, actualSize, deleteBitmap_); + + if ((*iter)->noMoreData()) { + iter = deletionVectorReaders_.erase(iter); + } else { + ++iter; + } + } } mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 @@ -160,6 +268,70 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation); + // Apply equality deletes after reading base data. Unlike positional deletes + // (which set bits before reading), equality deletes require the data values + // to be available for comparison. + if (rowsScanned > 0 && !equalityDeleteFileReaders_.empty()) { + auto outputRowVector = std::dynamic_pointer_cast(output); + VELOX_CHECK_NOT_NULL( + outputRowVector, "Output must be a RowVector for equality deletes."); + + auto numRows = outputRowVector->size(); + + // Use a separate bitmap for equality deletes to track which rows to + // remove from the output. + BufferPtr eqDeleteBitmap = AlignedBuffer::allocate( + numRows, connectorQueryCtx_->memoryPool()); + std::memset( + eqDeleteBitmap->asMutable(), 0, eqDeleteBitmap->size()); + + for (auto& reader : equalityDeleteFileReaders_) { + reader->applyDeletes(outputRowVector, eqDeleteBitmap); + } + + // Count surviving rows and compact the output if any rows were deleted. + auto* eqBitmap = eqDeleteBitmap->as(); + vector_size_t numDeleted = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (bits::isBitSet(eqBitmap, i)) { + ++numDeleted; + } + } + + if (numDeleted > 0) { + vector_size_t numSurviving = numRows - numDeleted; + if (numSurviving == 0) { + output = BaseVector::create( + outputRowVector->type(), 0, connectorQueryCtx_->memoryPool()); + return 0; + } + + // Build a list of surviving row ranges and use it to compact. + std::vector ranges; + ranges.reserve(numSurviving); + vector_size_t targetIdx = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (!bits::isBitSet(eqBitmap, i)) { + ranges.push_back({i, targetIdx++, 1}); + } + } + + auto newOutput = BaseVector::create( + outputRowVector->type(), + numSurviving, + connectorQueryCtx_->memoryPool()); + auto newRowOutput = std::dynamic_pointer_cast(newOutput); + for (auto i = 0; i < outputRowVector->childrenSize(); ++i) { + newRowOutput->childAt(i)->resize(numSurviving); + newRowOutput->childAt(i)->copyRanges( + outputRowVector->childAt(i).get(), ranges); + } + newRowOutput->resize(numSurviving); + output = newRowOutput; + rowsScanned = numSurviving; + } + } + return rowsScanned; } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index f727f5c86ff2..af9bf0660e00 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -18,6 +18,8 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" +#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h" #include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" namespace facebook::velox::connector::hive::iceberg { @@ -93,13 +95,20 @@ class IcebergSplitReader : public SplitReader { const RowTypePtr& fileType, const RowTypePtr& tableSchema) const override; - // The read offset to the beginning of the split in number of rows for the - // current batch for the base data file + /// Read offset to the beginning of the split in number of rows for the + /// current batch for the base data file. uint64_t baseReadOffset_; - // The file position for the first row in the split + /// File position for the first row in the split. uint64_t splitOffset_; std::list> positionalDeleteFileReaders_; BufferPtr deleteBitmap_; + + /// Readers for Iceberg V3 deletion vectors (Puffin-encoded roaring bitmaps). + std::list> deletionVectorReaders_; + + /// Readers for equality delete files. + std::list> + equalityDeleteFileReaders_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index a84064bb8613..08a509442fa3 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -83,6 +83,19 @@ if(NOT VELOX_DISABLE_GOOGLETEST) GTest::gtest_main ) + add_executable(velox_hive_iceberg_deletion_vector_test DeletionVectorReaderTest.cpp) + add_test(velox_hive_iceberg_deletion_vector_test velox_hive_iceberg_deletion_vector_test) + + target_link_libraries( + velox_hive_iceberg_deletion_vector_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) + if(VELOX_ENABLE_PARQUET) target_link_libraries(velox_hive_iceberg_test velox_dwio_parquet_reader) @@ -94,4 +107,17 @@ if(NOT VELOX_DISABLE_GOOGLETEST) velox_dwio_parquet_writer ) endif() + + add_executable(velox_hive_iceberg_equality_delete_test EqualityDeleteFileReaderTest.cpp) + add_test(velox_hive_iceberg_equality_delete_test velox_hive_iceberg_equality_delete_test) + + target_link_libraries( + velox_hive_iceberg_equality_delete_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_exec_test_lib + velox_dwio_common_test_utils + GTest::gtest + GTest::gtest_main + ) endif() diff --git a/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp b/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp new file mode 100644 index 000000000000..3000a502f641 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/DeletionVectorReaderTest.cpp @@ -0,0 +1,560 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/DeletionVectorReader.h" + +#include + +#include + +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/testutil/TempFilePath.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive::iceberg; +using namespace facebook::velox::common::testutil; + +namespace { + +/// Serializes a roaring bitmap in the portable format (no-run variant, +/// cookie = 12346). Supports only array containers (cardinality <= 4096). +/// This is the simplest format the DeletionVectorReader needs to parse. +std::string serializeRoaringBitmapNoRun(const std::vector& positions) { + if (positions.empty()) { + // Empty bitmap: cookie + 0 containers. + std::string data(8, '\0'); + uint32_t cookie = 12346; + uint32_t numContainers = 0; + std::memcpy(data.data(), &cookie, 4); + std::memcpy(data.data() + 4, &numContainers, 4); + return data; + } + + // Group positions by high 16 bits. + std::map> containers; + for (auto pos : positions) { + auto key = static_cast(pos >> 16); + auto low = static_cast(pos & 0xFFFF); + containers[key].push_back(low); + } + + for (auto& [key, vals] : containers) { + std::sort(vals.begin(), vals.end()); + } + + uint32_t numContainers = static_cast(containers.size()); + + std::string data; + // Cookie. + uint32_t cookie = 12346; + data.append(reinterpret_cast(&cookie), 4); + // Container count. + data.append(reinterpret_cast(&numContainers), 4); + + // Key-cardinality pairs. + for (auto& [key, vals] : containers) { + uint16_t cardMinus1 = static_cast(vals.size() - 1); + data.append(reinterpret_cast(&key), 2); + data.append(reinterpret_cast(&cardMinus1), 2); + } + + // Offset section (required for >= 4 containers). + if (numContainers >= 4) { + uint32_t offset = 4 + 4 + numContainers * 4 + numContainers * 4; + for (auto& [key, vals] : containers) { + data.append(reinterpret_cast(&offset), 4); + offset += static_cast(vals.size()) * 2; + } + } + + // Container data (array containers: sorted uint16 values). + for (auto& [key, vals] : containers) { + for (auto v : vals) { + data.append(reinterpret_cast(&v), 2); + } + } + + return data; +} + +/// Serializes a roaring bitmap in the portable format with run containers +/// (cookie = 12347). All containers are run-encoded. +std::string serializeRoaringBitmapWithRuns( + const std::vector< + std::pair>>>& + containerRuns) { + // containerRuns: vector of (highBitsKey, vector of (start, lengthMinus1)). + uint32_t numContainers = static_cast(containerRuns.size()); + + // Cookie: low 16 bits = 12347, high 16 bits = numContainers - 1. + uint32_t cookie = static_cast(12347) | ((numContainers - 1) << 16); + + std::string data; + data.append(reinterpret_cast(&cookie), 4); + + // Run bitmap: all containers are run containers. ceil(numContainers / 8) + // bytes. + uint32_t runBitmapBytes = (numContainers + 7) / 8; + std::vector runBitmap(runBitmapBytes, 0xFF); + data.append(reinterpret_cast(runBitmap.data()), runBitmapBytes); + + // Compute cardinality for each container. + std::vector cardinalities; + for (auto& [key, runs] : containerRuns) { + uint32_t card = 0; + for (auto& [start, lenMinus1] : runs) { + card += static_cast(lenMinus1) + 1; + } + cardinalities.push_back(card); + } + + // Key-cardinality pairs. + for (size_t i = 0; i < containerRuns.size(); ++i) { + uint16_t key = containerRuns[i].first; + uint16_t cardMinus1 = static_cast(cardinalities[i] - 1); + data.append(reinterpret_cast(&key), 2); + data.append(reinterpret_cast(&cardMinus1), 2); + } + + // Container data: each run container has numRuns (uint16) followed by + // (start, lengthMinus1) pairs. + for (auto& [key, runs] : containerRuns) { + uint16_t numRuns = static_cast(runs.size()); + data.append(reinterpret_cast(&numRuns), 2); + for (auto& [start, lenMinus1] : runs) { + data.append(reinterpret_cast(&start), 2); + data.append(reinterpret_cast(&lenMinus1), 2); + } + } + + return data; +} + +/// Writes binary data to a temp file and returns the path. +std::shared_ptr writeDvFile(const std::string& bitmapData) { + auto tempFile = TempFilePath::create(); + // Write directly via C++ streams since TempFilePath already creates the + // file and the local filesystem openFileForWrite may not overwrite. + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + VELOX_CHECK(out.good(), "Failed to open temp file for writing"); + out.write(bitmapData.data(), static_cast(bitmapData.size())); + out.close(); + return tempFile; +} + +/// Creates an IcebergDeleteFile for a deletion vector. +IcebergDeleteFile makeDvDeleteFile( + const std::string& filePath, + uint64_t recordCount, + uint64_t fileSize, + uint64_t blobOffset = 0, + std::optional blobLength = std::nullopt) { + std::unordered_map lowerBounds; + std::unordered_map upperBounds; + + lowerBounds[DeletionVectorReader::kDvOffsetFieldId] = + std::to_string(blobOffset); + if (blobLength.has_value()) { + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(blobLength.value()); + } else { + upperBounds[DeletionVectorReader::kDvLengthFieldId] = + std::to_string(fileSize); + } + + return IcebergDeleteFile( + FileContent::kDeletionVector, + filePath, + dwio::common::FileFormat::DWRF, + recordCount, + fileSize, + {}, + lowerBounds, + upperBounds); +} + +/// Extracts which bits are set in a bitmap buffer. +std::vector getSetBits(const BufferPtr& bitmap, uint64_t size) { + auto* raw = bitmap->as(); + std::vector result; + for (uint64_t i = 0; i < size; ++i) { + if (bits::isBitSet(raw, i)) { + result.push_back(i); + } + } + return result; +} + +} // namespace + +class DeletionVectorReaderTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + } + + void SetUp() override { + filesystems::registerLocalFileSystem(); + pool_ = memory::memoryManager()->addLeafPool("DeletionVectorReaderTest"); + } + + BufferPtr allocateBitmap(uint64_t numBits) { + auto numBytes = bits::nbytes(numBits); + auto buffer = AlignedBuffer::allocate(numBytes, pool_.get(), 0); + return buffer; + } + + std::shared_ptr pool_; +}; + +TEST_F(DeletionVectorReaderTest, basicArrayContainer) { + // Positions: 0, 5, 10, 99. + std::vector positions = {0, 5, 10, 99}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + EXPECT_FALSE(reader.noMoreData()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{0, 5, 10, 99})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, batchRangeFiltering) { + // Positions: 10, 20, 30, 40, 50. + std::vector positions = {10, 20, 30, 40, 50}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // First batch: rows 0-24 (should contain positions 10, 20). + auto bitmap1 = allocateBitmap(25); + reader.readDeletePositions(0, 25, bitmap1); + auto bits1 = getSetBits(bitmap1, 25); + EXPECT_EQ(bits1, (std::vector{10, 20})); + EXPECT_FALSE(reader.noMoreData()); + + // Second batch: rows 25-49 (should contain positions 30, 40). + auto bitmap2 = allocateBitmap(25); + reader.readDeletePositions(25, 25, bitmap2); + auto bits2 = getSetBits(bitmap2, 25); + EXPECT_EQ(bits2, (std::vector{5, 15})); + EXPECT_FALSE(reader.noMoreData()); + + // Third batch: rows 50-74 (should contain position 50). + auto bitmap3 = allocateBitmap(25); + reader.readDeletePositions(50, 25, bitmap3); + auto bits3 = getSetBits(bitmap3, 25); + EXPECT_EQ(bits3, (std::vector{0})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, splitOffset) { + // Positions: 100, 105, 110. + std::vector positions = {100, 105, 110}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + // Split starts at row 100. + DeletionVectorReader reader(dvFile, 100, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + // Positions 100, 105, 110 relative to splitOffset=100, baseReadOffset=0 + // become bit indices 0, 5, 10. + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{0, 5, 10})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, splitOffsetWithBaseReadOffset) { + // Positions: 200, 210, 220. + std::vector positions = {200, 210, 220}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + // Split starts at row 100. + DeletionVectorReader reader(dvFile, 100, pool_.get()); + + // First batch: baseReadOffset=100, so file positions [200, 300). + // Positions 200, 210, 220 are all in range. + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(100, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{0, 10, 20})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, noDeletesInRange) { + // Positions: 1000, 2000. + std::vector positions = {1000, 2000}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Batch covers rows 0-99, no deletions in this range. + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_TRUE(setBits.empty()); + EXPECT_FALSE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, runContainers) { + // Use run-encoded containers: positions 10-19 and 50-59. + std::vector>>> + containerRuns = { + {0, {{10, 9}, {50, 9}}}, + }; + auto bitmapData = serializeRoaringBitmapWithRuns(containerRuns); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 20, fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + // Expect positions 10-19 and 50-59. + std::vector expected; + for (uint64_t i = 10; i <= 19; ++i) { + expected.push_back(i); + } + for (uint64_t i = 50; i <= 59; ++i) { + expected.push_back(i); + } + EXPECT_EQ(setBits, expected); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, largePositionsMultipleContainers) { + // Positions spanning two containers: one in container 0 (key=0), one in + // container 1 (key=1, i.e. pos >= 65536). + std::vector positions = {5, 100, 65536, 65600}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + // Read a batch covering all positions. + auto bitmap = allocateBitmap(66000); + reader.readDeletePositions(0, 66000, bitmap); + + auto setBits = getSetBits(bitmap, 66000); + EXPECT_EQ(setBits, (std::vector{5, 100, 65536, 65600})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, blobOffset) { + // Write a file with some padding before the actual bitmap data. + std::vector positions = {3, 7, 11}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + + // Prepend 64 bytes of padding. + std::string padding(64, 'X'); + std::string fileContent = padding + bitmapData; + + auto tempFile = writeDvFile(fileContent); + auto fileSize = static_cast(fileContent.size()); + + auto dvFile = makeDvDeleteFile( + tempFile->getPath(), positions.size(), fileSize, 64, bitmapData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(20); + reader.readDeletePositions(0, 20, bitmap); + + auto setBits = getSetBits(bitmap, 20); + EXPECT_EQ(setBits, (std::vector{3, 7, 11})); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, constructorRejectsWrongContentType) { + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write("dummy", 5); + } + + IcebergDeleteFile badFile( + FileContent::kPositionalDeletes, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + 5); + + VELOX_ASSERT_THROW( + DeletionVectorReader(badFile, 0, pool_.get()), + "Expected deletion vector file"); +} + +TEST_F(DeletionVectorReaderTest, constructorRejectsEmptyDv) { + auto tempFile = TempFilePath::create(); + { + std::ofstream out(tempFile->getPath(), std::ios::binary | std::ios::trunc); + out.write("dummy", 5); + } + + IcebergDeleteFile emptyDv( + FileContent::kDeletionVector, + tempFile->getPath(), + dwio::common::FileFormat::DWRF, + 0, + 5); + + VELOX_ASSERT_THROW( + DeletionVectorReader(emptyDv, 0, pool_.get()), "Empty deletion vector"); +} + +TEST_F(DeletionVectorReaderTest, noMoreDataAfterAllConsumed) { + std::vector positions = {0, 1, 2}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + EXPECT_FALSE(reader.noMoreData()); + + auto bitmap = allocateBitmap(10); + reader.readDeletePositions(0, 10, bitmap); + EXPECT_TRUE(reader.noMoreData()); + + // Additional reads should be no-ops. + auto bitmap2 = allocateBitmap(10); + reader.readDeletePositions(10, 10, bitmap2); + auto setBits2 = getSetBits(bitmap2, 10); + EXPECT_TRUE(setBits2.empty()); + EXPECT_TRUE(reader.noMoreData()); +} + +TEST_F(DeletionVectorReaderTest, singlePosition) { + std::vector positions = {42}; + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + EXPECT_EQ(setBits, (std::vector{42})); +} + +TEST_F(DeletionVectorReaderTest, consecutivePositions) { + // Positions: 0 through 99 (100 consecutive positions). + std::vector positions; + positions.reserve(100); + for (int64_t i = 0; i < 100; ++i) { + positions.push_back(i); + } + auto bitmapData = serializeRoaringBitmapNoRun(positions); + auto tempFile = writeDvFile(bitmapData); + auto fileSize = static_cast(bitmapData.size()); + + auto dvFile = + makeDvDeleteFile(tempFile->getPath(), positions.size(), fileSize); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(100); + reader.readDeletePositions(0, 100, bitmap); + + auto setBits = getSetBits(bitmap, 100); + std::vector expected; + expected.reserve(100); + for (uint64_t i = 0; i < 100; ++i) { + expected.push_back(i); + } + EXPECT_EQ(setBits, expected); +} + +TEST_F(DeletionVectorReaderTest, invalidBitmapTooSmall) { + // Write a file that is too small to contain a valid roaring bitmap header. + std::string tinyData(4, '\0'); + auto tempFile = writeDvFile(tinyData); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 1, tinyData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(10); + VELOX_ASSERT_THROW(reader.readDeletePositions(0, 10, bitmap), "too small"); +} + +TEST_F(DeletionVectorReaderTest, invalidBitmapBadCookie) { + // Write a file with an invalid cookie. + std::string badData(12, '\0'); + uint32_t badCookie = 99999; + std::memcpy(badData.data(), &badCookie, 4); + auto tempFile = writeDvFile(badData); + + auto dvFile = makeDvDeleteFile(tempFile->getPath(), 1, badData.size()); + + DeletionVectorReader reader(dvFile, 0, pool_.get()); + + auto bitmap = allocateBitmap(10); + VELOX_ASSERT_THROW( + reader.readDeletePositions(0, 10, bitmap), + "Unknown roaring bitmap cookie"); +} diff --git a/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp new file mode 100644 index 000000000000..601a583e0a6c --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/EqualityDeleteFileReaderTest.cpp @@ -0,0 +1,640 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/file/FileSystems.h" +#include "velox/common/testutil/TempDirectoryPath.h" +#include "velox/connectors/hive/iceberg/IcebergConnector.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +namespace facebook::velox::connector::hive::iceberg { + +using namespace facebook::velox::exec::test; + +namespace { + +const std::string kIcebergConnectorId = "test-iceberg-eq-delete"; + +} // namespace + +/// End-to-end tests for equality deletes via the IcebergSplitReader. +/// These tests write DWRF data files and delete files, then execute +/// table scans verifying that matching rows are filtered out. +class EqualityDeleteFileReaderTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + IcebergConnectorFactory icebergFactory; + auto icebergConnector = icebergFactory.newConnector( + kIcebergConnectorId, + std::make_shared( + std::unordered_map()), + ioExecutor_.get()); + connector::registerConnector(icebergConnector); + } + + void TearDown() override { + connector::unregisterConnector(kIcebergConnectorId); + HiveConnectorTestBase::TearDown(); + } + + uint64_t getFileSize(const std::string& path) { + return filesystems::getFileSystem(path, nullptr) + ->openFileForRead(path) + ->size(); + } + + /// Writes a DWRF data file containing the given vectors. + std::shared_ptr writeDataFile( + const std::vector& data) { + auto file = common::testutil::TempFilePath::create(); + writeToFile(file->getPath(), data); + return file; + } + + /// Writes a DWRF delete file containing the equality delete rows. + std::shared_ptr writeEqDeleteFile( + const std::vector& deleteData) { + auto file = common::testutil::TempFilePath::create(); + writeToFile(file->getPath(), deleteData); + return file; + } + + /// Creates splits with equality delete files attached. + std::vector> makeSplits( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}, + int64_t dataSequenceNumber = 0) { + auto fileSize = getFileSize(dataFilePath); + return {std::make_shared( + kIcebergConnectorId, + dataFilePath, + dwio::common::FileFormat::DWRF, + 0, + fileSize, + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + /*cacheable=*/true, + deleteFiles, + std::unordered_map{}, + std::nullopt, + dataSequenceNumber)}; + } + + /// Builds a table scan plan node with the given schema. + core::PlanNodePtr makeTableScanPlan(const RowTypePtr& rowType) { + return PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .dataColumns(rowType) + .endTableScan() + .planNode(); + } +}; + +/// Verifies that base rows matching the equality delete file are removed. +TEST_F(EqualityDeleteFileReaderTest, basicSingleColumnDelete) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}), + makeFlatVector( + {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where id == 3 or id == 7. + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({3, 7}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); // field ID 1 = column 0 = "id" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({0, 1, 2, 4, 5, 6, 8, 9}), + makeFlatVector({"a", "b", "c", "e", "f", "g", "i", "j"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies multi-column equality deletes (both columns must match). +TEST_F(EqualityDeleteFileReaderTest, multiColumnDelete) { + auto rowType = ROW({"a", "b", "c"}, {INTEGER(), VARCHAR(), BIGINT()}); + + auto baseData = makeRowVector( + {"a", "b", "c"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"x", "y", "z", "x", "y"}), + makeFlatVector({10, 20, 30, 40, 50}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where (a=2, b="y") — matches row index 1. + // Also (a=5, b="y") — matches row index 4. + // But (a=1, b="y") — no match (a=1 has b="x"). + auto deleteData = makeRowVector( + {"a", "b"}, + { + makeFlatVector({2, 5, 1}), + makeFlatVector({"y", "y", "y"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1, 2}); // field IDs 1,2 = columns "a","b" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows 0, 2, 3 survive (rows 1 and 4 deleted). + auto expected = makeRowVector( + {"a", "b", "c"}, + { + makeFlatVector({1, 3, 4}), + makeFlatVector({"x", "z", "x"}), + makeFlatVector({10, 30, 40}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when no rows match, all rows survive. +TEST_F(EqualityDeleteFileReaderTest, noMatchingDeletes) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete file has values not present in base data. + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({100, 200}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that all rows are deleted when every base row matches. +TEST_F(EqualityDeleteFileReaderTest, allRowsDeleted) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + EXPECT_EQ(result->size(), 0); +} + +/// Verifies equality deletes with VARCHAR columns. +TEST_F(EqualityDeleteFileReaderTest, stringColumnDelete) { + auto rowType = ROW({"name", "age"}, {VARCHAR(), INTEGER()}); + + auto baseData = makeRowVector( + {"name", "age"}, + { + makeFlatVector({"alice", "bob", "charlie", "dave"}), + makeFlatVector({25, 30, 35, 40}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where name is "bob" or "dave". + auto deleteData = makeRowVector( + {"name"}, + { + makeFlatVector({"bob", "dave"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}); // field ID 1 = "name" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + auto expected = makeRowVector( + {"name", "age"}, + { + makeFlatVector({"alice", "charlie"}), + makeFlatVector({25, 35}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies equality deletes on a non-first column (field ID 2). +TEST_F(EqualityDeleteFileReaderTest, deleteOnSecondColumn) { + auto rowType = ROW({"id", "category"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "category"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"A", "B", "A", "C", "B"}), + }); + auto dataFile = writeDataFile({baseData}); + + // Delete rows where category == "B". + auto deleteData = makeRowVector( + {"category"}, + { + makeFlatVector({"B"}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{2}); // field ID 2 = column 1 = "category" + + auto splits = makeSplits(dataFile->getPath(), {icebergDeleteFile}); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows with category="B" (indices 1,4) deleted. + auto expected = makeRowVector( + {"id", "category"}, + { + makeFlatVector({1, 3, 4}), + makeFlatVector({"A", "A", "C"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes apply when the delete file has a higher +/// sequence number than the data file (per the Iceberg V2+ spec). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberDeleteApplies) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"a", "b", "c", "d", "e"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({2, 4}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 5, data file has sequence number 3. + // Since deleteSeq (5) > dataSeq (3), the delete should apply. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/5); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/3); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Rows with id=2 and id=4 are deleted. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 3, 5}), + makeFlatVector({"a", "c", "e"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes are skipped when the delete file has a +/// lower or equal sequence number compared to the data file. +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberDeleteSkipped) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 2, data file has sequence number 5. + // Since deleteSeq (2) <= dataSeq (5), the delete should be skipped. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/2); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // All rows survive because the delete file is skipped. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that equality deletes are skipped when the delete file has the +/// same sequence number as the data file (edge case of the <= check). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberEqualSkipped) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file and data file have the same sequence number (5). + // Since deleteSeq (5) <= dataSeq (5), the delete should be skipped. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 3, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/5); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // All rows survive because the delete file is skipped (equal seq#). + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({"a", "b", "c"}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when either sequence number is 0 (unassigned/legacy V1), +/// the delete file is always applied (filtering is disabled). +TEST_F(EqualityDeleteFileReaderTest, sequenceNumberZeroAlwaysApplies) { + auto rowType = ROW({"id"}, {BIGINT()}); + + auto baseData = makeRowVector( + {"id"}, + { + makeFlatVector({1, 2, 3}), + }); + auto dataFile = writeDataFile({baseData}); + + auto deleteData = makeRowVector( + {"id"}, + { + makeFlatVector({2}), + }); + auto eqDeleteFile = writeEqDeleteFile({deleteData}); + + // Delete file has sequence number 0 (legacy), data file has sequence 10. + // Since deleteSeq is 0, filtering is disabled and the delete applies. + IcebergDeleteFile icebergDeleteFile( + FileContent::kEqualityDeletes, + eqDeleteFile->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/0); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile}, + /*dataSequenceNumber=*/10); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Row id=2 is deleted because sequence number filtering is disabled. + auto expected = makeRowVector( + {"id"}, + { + makeFlatVector({1, 3}), + }); + + assertEqualResults({expected}, {result}); +} + +/// Verifies that when multiple delete files have different sequence numbers, +/// only those with higher sequence numbers than the data file are applied. +TEST_F(EqualityDeleteFileReaderTest, mixedSequenceNumbers) { + auto rowType = ROW({"id", "value"}, {BIGINT(), VARCHAR()}); + + auto baseData = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 2, 3, 4, 5}), + makeFlatVector({"a", "b", "c", "d", "e"}), + }); + auto dataFile = writeDataFile({baseData}); + + // First delete file: seqNum=10 (higher than data seqNum=5) → applied. + auto deleteData1 = makeRowVector( + {"id"}, + { + makeFlatVector({2}), + }); + auto eqDeleteFile1 = writeEqDeleteFile({deleteData1}); + IcebergDeleteFile icebergDeleteFile1( + FileContent::kEqualityDeletes, + eqDeleteFile1->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile1->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/10); + + // Second delete file: seqNum=3 (lower than data seqNum=5) → skipped. + auto deleteData2 = makeRowVector( + {"id"}, + { + makeFlatVector({4}), + }); + auto eqDeleteFile2 = writeEqDeleteFile({deleteData2}); + IcebergDeleteFile icebergDeleteFile2( + FileContent::kEqualityDeletes, + eqDeleteFile2->getPath(), + dwio::common::FileFormat::DWRF, + 1, + getFileSize(eqDeleteFile2->getPath()), + /*equalityFieldIds=*/{1}, + /*lowerBounds=*/{}, + /*upperBounds=*/{}, + /*dataSequenceNumber=*/3); + + auto splits = makeSplits( + dataFile->getPath(), + {icebergDeleteFile1, icebergDeleteFile2}, + /*dataSequenceNumber=*/5); + auto plan = makeTableScanPlan(rowType); + auto result = AssertQueryBuilder(plan).splits(splits).copyResults(pool()); + + // Only id=2 is deleted (from delete file 1 with seqNum=10). + // id=4 survives because delete file 2 (seqNum=3) is skipped. + auto expected = makeRowVector( + {"id", "value"}, + { + makeFlatVector({1, 3, 4, 5}), + makeFlatVector({"a", "c", "d", "e"}), + }); + + assertEqualResults({expected}, {result}); +} + +// TODO: Add a Parquet-format equality delete test. Currently all equality +// delete tests use DWRF because writeToFile() (from HiveConnectorTestBase) +// only supports DWRF. Adding a Parquet test requires adding Parquet writer +// dependencies to this test target's BUCK file and a Parquet write helper. + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 29acc03a2e10..e6416e3e0a91 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -42,6 +42,15 @@ using namespace facebook::velox::common::testutil; namespace facebook::velox::connector::hive::iceberg { +namespace { +// Return the file size for the given path using Velox's filesystem API. +uint64_t getTestFileSize(const std::string& path) { + return filesystems::getFileSystem(path, nullptr) + ->openFileForRead(path) + ->size(); +} +} // namespace + static const char* kIcebergConnectorId = "test-iceberg"; class HiveIcebergTest : public HiveConnectorTestBase { @@ -241,8 +250,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFilePath, fileFomat_, deleteFilePaths[deleteFileName].first, - testing::internal::GetFileSize( - std::fopen(deleteFilePath.c_str(), "r"))); + getTestFileSize(deleteFilePath)); deleteFiles.push_back(icebergDeleteFile); } } @@ -354,8 +362,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFilePath->getPath(), fileFomat_, deletedPositionSize, - testing::internal::GetFileSize( - std::fopen(deleteFilePath->getPath().c_str(), "r"))); + getTestFileSize(deleteFilePath->getPath())); auto fileSize = filesystems::getFileSystem(path, nullptr) ->openFileForRead(path) ->size(); @@ -934,8 +941,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { deleteFilePath->getPath(), dwio::common::FileFormat::DWRF, 3, - testing::internal::GetFileSize( - std::fopen(deleteFilePath->getPath().c_str(), "r")), + getTestFileSize(deleteFilePath->getPath()), {}, {}, {{posColumn->id, encodedUpperBound}}); @@ -1004,4 +1010,279 @@ TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) { 0); } #endif + +// Sequence number filtering tests for positional deletes (Diff 2). +// Per the Iceberg V2+ spec, a positional delete file should only apply to +// data files whose dataSequenceNumber is strictly less than the delete file's. + +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberApplied) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + // Write base data file: c0 = [0, 1, 2, 3, 4]. + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + // Write positional delete file targeting positions 1 and 3. + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=10 > data seqNum=5 → delete should be applied. + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/10); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/5); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // Positions 1 and 3 deleted → remaining: [0, 2, 4]. + auto expected = makeRowVector({makeFlatVector({0, 2, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberSkipped) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=5 < data seqNum=10 → delete should be skipped. + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/5); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/10); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // Delete skipped → all rows survive: [0, 1, 2, 3, 4]. + auto expected = makeRowVector({makeFlatVector({0, 1, 2, 3, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + +// Verifies that same-snapshot positional deletes apply (deleteSeqNum == +// dataSeqNum). Per the Iceberg spec, positional deletes in the same snapshot +// SHOULD apply, so the skip condition uses strict < (not <=). +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberEqualApplied) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=5 == data seqNum=5 → delete should be applied + // (same-snapshot positional deletes apply per Iceberg spec). + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/5); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/5); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // Same-snapshot delete applied: positions 1 and 3 deleted → [0, 2, 4]. + auto expected = makeRowVector({makeFlatVector({0, 2, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + +TEST_F(HiveIcebergTest, positionalDeleteSequenceNumberZeroDisablesFilter) { + folly::SingletonVault::singleton()->registrationComplete(); + + auto pathColumn = IcebergMetadataColumn::icebergDeleteFilePathColumn(); + auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); + auto rowType = ROW({"c0"}, {BIGINT()}); + + auto dataFilePath = TempFilePath::create(); + writeToFile( + dataFilePath->getPath(), + {makeRowVector({makeFlatVector({0, 1, 2, 3, 4})})}); + + auto deleteFilePath = TempFilePath::create(); + auto baseFilePath = dataFilePath->getPath(); + writeToFile( + deleteFilePath->getPath(), + {makeRowVector( + {pathColumn->name, posColumn->name}, + { + makeFlatVector( + 2, [&](vector_size_t) { return baseFilePath; }), + makeFlatVector({1, 3}), + })}); + + // Delete file seqNum=0 (unassigned/V1 legacy) → always applied regardless. + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->getPath(), + dwio::common::FileFormat::DWRF, + 2, + getTestFileSize(deleteFilePath->getPath()), + {}, + {}, + {}, + /*dataSequenceNumber=*/0); + + auto file = filesystems::getFileSystem(baseFilePath, nullptr) + ->openFileForRead(baseFilePath); + auto split = std::make_shared( + kIcebergConnectorId, + baseFilePath, + dwio::common::FileFormat::DWRF, + 0, + file->size(), + std::unordered_map>{}, + std::nullopt, + std::unordered_map{}, + nullptr, + true, + std::vector{deleteFile}, + std::unordered_map{}, + std::nullopt, + /*dataSequenceNumber=*/100); + + auto plan = PlanBuilder() + .startTableScan() + .connectorId(kIcebergConnectorId) + .outputType(rowType) + .endTableScan() + .planNode(); + + // SeqNum=0 disables filtering → delete applied: [0, 2, 4]. + auto expected = makeRowVector({makeFlatVector({0, 2, 4})}); + AssertQueryBuilder(plan).split(split).assertResults({expected}); +} + } // namespace facebook::velox::connector::hive::iceberg