diff --git a/velox/connectors/clp/search_lib/CMakeLists.txt b/velox/connectors/clp/search_lib/CMakeLists.txt index bb9452b8cc25..69451d2fc79f 100644 --- a/velox/connectors/clp/search_lib/CMakeLists.txt +++ b/velox/connectors/clp/search_lib/CMakeLists.txt @@ -19,7 +19,8 @@ velox_add_library( ClpPackageS3AuthProvider.cpp ClpPackageS3AuthProvider.h ClpS3AuthProviderBase.cpp - ClpS3AuthProviderBase.h) + ClpS3AuthProviderBase.h + ClpTimestampsUtils.h) add_subdirectory(archive) add_subdirectory(ir) diff --git a/velox/connectors/clp/search_lib/ClpTimestampsUtils.h b/velox/connectors/clp/search_lib/ClpTimestampsUtils.h new file mode 100644 index 000000000000..db870fcbf64c --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpTimestampsUtils.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/type/Timestamp.h" + +namespace facebook::velox::connector::clp::search_lib { + +enum class InputTimestampPrecision : uint8_t { + Seconds, + Milliseconds, + Microseconds, + Nanoseconds +}; + +/// Estimates the precision of an epoch timestamp as seconds, milliseconds, +/// microseconds, or nanoseconds. +/// +/// This heuristic relies on the fact that 1 year of epoch nanoseconds is +/// approximately 1000 years of epoch microseconds and so on. This heuristic +/// can be unreliable for timestamps sufficiently close to the epoch, but +/// should otherwise be accurate for the next 1000 years. +/// +/// Note: Future versions of the clp-s archive format will adopt a +/// nanosecond-precision integer timestamp format (as opposed to the current +/// format which allows other precisions), at which point we can remove this +/// heuristic. +/// +/// @param timestamp +/// @return the estimated timestamp precision +template +auto estimatePrecision(T timestamp) -> InputTimestampPrecision { + constexpr int64_t kEpochMilliseconds1971{31536000000}; + constexpr int64_t kEpochMicroseconds1971{31536000000000}; + constexpr int64_t kEpochNanoseconds1971{31536000000000000}; + auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp; + + if (absTimestamp > kEpochNanoseconds1971) { + return InputTimestampPrecision::Nanoseconds; + } else if (absTimestamp > kEpochMicroseconds1971) { + return InputTimestampPrecision::Microseconds; + } else if (absTimestamp > kEpochMilliseconds1971) { + return InputTimestampPrecision::Milliseconds; + } else { + return InputTimestampPrecision::Seconds; + } +} + +/// Converts a double value into a Velox timestamp. +/// +/// @param timestamp the input timestamp as a double +/// @return the corresponding Velox timestamp +inline auto convertToVeloxTimestamp(double timestamp) -> Timestamp { + switch (estimatePrecision(timestamp)) { + case InputTimestampPrecision::Nanoseconds: + timestamp /= Timestamp::kNanosInSecond; + break; + case InputTimestampPrecision::Microseconds: + timestamp /= Timestamp::kMicrosecondsInSecond; + break; + case InputTimestampPrecision::Milliseconds: + timestamp /= Timestamp::kMillisecondsInSecond; + break; + case InputTimestampPrecision::Seconds: + break; + } + double seconds{std::floor(timestamp)}; + double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond}; + return Timestamp( + static_cast(seconds), static_cast(nanoseconds)); +} + +/// Converts an integer value into a Velox timestamp. +/// +/// @param timestamp the input timestamp as an integer +/// @return the corresponding Velox timestamp +inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp { + int64_t precisionDifference{Timestamp::kNanosInSecond}; + switch (estimatePrecision(timestamp)) { + case InputTimestampPrecision::Nanoseconds: + break; + case InputTimestampPrecision::Microseconds: + precisionDifference = + Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond; + break; + case InputTimestampPrecision::Milliseconds: + precisionDifference = + Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond; + break; + case InputTimestampPrecision::Seconds: + precisionDifference = + Timestamp::kNanosInSecond / Timestamp::kNanosInSecond; + break; + } + int64_t seconds{timestamp / precisionDifference}; + int64_t nanoseconds{ + (timestamp % precisionDifference) * + (Timestamp::kNanosInSecond / precisionDifference)}; + if (nanoseconds < 0) { + seconds -= 1; + nanoseconds += Timestamp::kNanosInSecond; + } + return Timestamp(seconds, static_cast(nanoseconds)); +} + +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp b/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp index 1a48f9a6a911..c8b362da6ac6 100644 --- a/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp +++ b/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp @@ -21,6 +21,7 @@ #include "clp_s/ColumnReader.hpp" #include "clp_s/SchemaTree.hpp" #include "velox/connectors/clp/search_lib/BaseClpCursor.h" +#include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h" #include "velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.h" #include "velox/type/Timestamp.h" #include "velox/vector/ComplexVector.h" @@ -28,99 +29,6 @@ namespace facebook::velox::connector::clp::search_lib { -namespace { - -enum class TimestampPrecision : uint8_t { - Seconds, - Milliseconds, - Microseconds, - Nanoseconds -}; - -/// Estimates the precision of an epoch timestamp as seconds, milliseconds, -/// microseconds, or nanoseconds. -/// -/// This heuristic relies on the fact that 1 year of epoch nanoseconds is -/// approximately 1000 years of epoch microseconds and so on. This heuristic -/// can be unreliable for timestamps sufficiently close to the epoch, but -/// should otherwise be accurate for the next 1000 years. -/// -/// Note: Future versions of the clp-s archive format will adopt a -/// nanosecond-precision integer timestamp format (as opposed to the current -/// format which allows other precisions), at which point we can remove this -/// heuristic. -/// -/// @param timestamp -/// @return the estimated timestamp precision -template -auto estimatePrecision(T timestamp) -> TimestampPrecision { - constexpr int64_t kEpochMilliseconds1971{31536000000}; - constexpr int64_t kEpochMicroseconds1971{31536000000000}; - constexpr int64_t kEpochNanoseconds1971{31536000000000000}; - auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp; - - if (absTimestamp > kEpochNanoseconds1971) { - return TimestampPrecision::Nanoseconds; - } else if (absTimestamp > kEpochMicroseconds1971) { - return TimestampPrecision::Microseconds; - } else if (absTimestamp > kEpochMilliseconds1971) { - return TimestampPrecision::Milliseconds; - } else { - return TimestampPrecision::Seconds; - } -} - -auto convertToVeloxTimestamp(double timestamp) -> Timestamp { - switch (estimatePrecision(timestamp)) { - case TimestampPrecision::Nanoseconds: - timestamp /= Timestamp::kNanosInSecond; - break; - case TimestampPrecision::Microseconds: - timestamp /= Timestamp::kMicrosecondsInSecond; - break; - case TimestampPrecision::Milliseconds: - timestamp /= Timestamp::kMillisecondsInSecond; - break; - case TimestampPrecision::Seconds: - break; - } - double seconds{std::floor(timestamp)}; - double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond}; - return Timestamp( - static_cast(seconds), static_cast(nanoseconds)); -} - -auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp { - int64_t precisionDifference{Timestamp::kNanosInSecond}; - switch (estimatePrecision(timestamp)) { - case TimestampPrecision::Nanoseconds: - break; - case TimestampPrecision::Microseconds: - precisionDifference = - Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond; - break; - case TimestampPrecision::Milliseconds: - precisionDifference = - Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond; - break; - case TimestampPrecision::Seconds: - precisionDifference = - Timestamp::kNanosInSecond / Timestamp::kNanosInSecond; - break; - } - int64_t seconds{timestamp / precisionDifference}; - int64_t nanoseconds{ - (timestamp % precisionDifference) * - (Timestamp::kNanosInSecond / precisionDifference)}; - if (nanoseconds < 0) { - seconds -= 1; - nanoseconds += Timestamp::kNanosInSecond; - } - return Timestamp(seconds, static_cast(nanoseconds)); -} - -} // namespace - ClpArchiveVectorLoader::ClpArchiveVectorLoader( clp_s::BaseColumnReader* columnReader, ColumnType nodeType, diff --git a/velox/connectors/clp/search_lib/ir/CMakeLists.txt b/velox/connectors/clp/search_lib/ir/CMakeLists.txt index 35a49d592613..1dce25d1af67 100644 --- a/velox/connectors/clp/search_lib/ir/CMakeLists.txt +++ b/velox/connectors/clp/search_lib/ir/CMakeLists.txt @@ -29,5 +29,6 @@ velox_link_libraries( clp_s::clp_dependencies clp_s::io clp_s::search + clp_s::search::ast clp_s::search::kql velox_vector) diff --git a/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp b/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp index 124fa5e0b063..1f51bf83cfda 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp +++ b/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp @@ -59,10 +59,10 @@ VectorPtr ClpIrCursor::createVector( const TypePtr& vectorType, size_t vectorSize) { VELOX_CHECK_EQ( - projectedColumnIdxNodeIdMap_.size(), + projectedColumnIdxNodeIdsMap_.size(), outputColumns_.size(), - "Projected columns size {} does not match fields size {}", - projectedColumnIdxNodeIdMap_.size(), + "Resolved node-id map size ({}) must not exceed projected columns ({})", + projectedColumnIdxNodeIdsMap_.size(), outputColumns_.size()); return createVectorHelper(pool, vectorType, vectorSize); } @@ -133,9 +133,8 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const { search::ast::LiteralType::ClpStringT; break; case ColumnType::Timestamp: - // TODO: IR timestamp support pending; constrain to Unknown to avoid - // mismatched projections. - literalType = search::ast::LiteralType::EpochDateT; + literalType = search::ast::LiteralType::FloatT | + search::ast::LiteralType::IntegerT; break; default: literalType = search::ast::LiteralType::UnknownT; @@ -189,11 +188,12 @@ VectorPtr ClpIrCursor::createVectorHelper( readerIndex_, outputColumns_.size(), "Reader index out of bounds"); auto projectedColumn = outputColumns_[readerIndex_]; auto projectedColumnType = projectedColumn.type; - auto it = projectedColumnIdxNodeIdMap_.find(readerIndex_); - bool isResolved = it != projectedColumnIdxNodeIdMap_.end(); - ::clp::ffi::SchemaTree::Node::id_t projectedColumnNodeId; + auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_); + std::vector<::clp::ffi::SchemaTree::Node::id_t> projectedColumnNodeIds{}; + bool isResolved = + it != projectedColumnIdxNodeIdsMap_.end() && !it->second.empty(); if (isResolved) { - projectedColumnNodeId = it->second; + projectedColumnNodeIds = it->second; } readerIndex_++; return std::make_shared( @@ -201,10 +201,11 @@ VectorPtr ClpIrCursor::createVectorHelper( vectorType, vectorSize, std::make_unique( + irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(), isResolved, - projectedColumnType, - projectedColumnNodeId, - irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()), + std::move(projectedColumnNodeIds), + projectedColumn.name, + projectedColumnType), std::move(vector)); } diff --git a/velox/connectors/clp/search_lib/ir/ClpIrCursor.h b/velox/connectors/clp/search_lib/ir/ClpIrCursor.h index a132bc3ba351..c583a5b228ea 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrCursor.h +++ b/velox/connectors/clp/search_lib/ir/ClpIrCursor.h @@ -59,8 +59,15 @@ class ClpIrCursor final : public BaseClpCursor { [[maybe_unused]] std::pair projected_key_and_index) -> ystdlib::error_handling::Result { - projectedColumnIdxNodeIdMap_.insert( - {projected_key_and_index.second, nodeId}); + auto it = + projectedColumnIdxNodeIdsMap_.find(projected_key_and_index.second); + if (it == projectedColumnIdxNodeIdsMap_.end()) { + projectedColumnIdxNodeIdsMap_.insert( + {projected_key_and_index.second, + std::vector<::clp::ffi::SchemaTree::Node::id_t>{nodeId}}); + return ystdlib::error_handling::success(); + } + it->second.emplace_back(nodeId); return ystdlib::error_handling::success(); }; using QueryHandlerType = ::clp::ffi::ir_stream::search::QueryHandler< @@ -70,8 +77,8 @@ class ClpIrCursor final : public BaseClpCursor { ::clp::ffi::ir_stream::Deserializer> irDeserializer_; std::shared_ptr<::clp::ReaderInterface> irReader_{nullptr}; - std::unordered_map - projectedColumnIdxNodeIdMap_; + std::unordered_map> + projectedColumnIdxNodeIdsMap_; size_t readerIndex_{0}; std::vector< diff --git a/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp b/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp index c082aecb85d0..09528a2ccfb0 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp +++ b/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp @@ -15,7 +15,9 @@ */ #include "velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h" + #include "velox/connectors/clp/search_lib/BaseClpCursor.h" +#include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h" namespace facebook::velox::connector::clp::search_lib { @@ -33,12 +35,20 @@ void ClpIrVectorLoader::loadInternal( auto& logEvent = filteredLogEvents_->at(vectorIndex); // TODO: also need to support auto-generated keys auto userGenNodeIdValueMap = logEvent->get_user_gen_node_id_value_pairs(); - auto const value_it{userGenNodeIdValueMap.find(nodeId_)}; - if (userGenNodeIdValueMap.end() == value_it || - false == value_it->second.has_value()) { + auto valueIt = userGenNodeIdValueMap.end(); + ::clp::ffi::SchemaTree::Node::id_t nodeId{}; + for (auto const candidateNodeId : nodeIds_) { + valueIt = userGenNodeIdValueMap.find(candidateNodeId); + if (valueIt != userGenNodeIdValueMap.end()) { + nodeId = candidateNodeId; + break; + } + } + if (userGenNodeIdValueMap.end() == valueIt || + false == valueIt->second.has_value()) { continue; } - auto const& value{value_it->second}; + auto const& value{valueIt->second}; switch (nodeType_) { case ColumnType::String: { auto stringVector = vector->asFlatVector(); @@ -89,6 +99,21 @@ void ClpIrVectorLoader::loadInternal( vector->setNull(vectorIndex, false); break; } + case ColumnType::Timestamp: { + auto timestampVector = vector->asFlatVector(); + if (value->is()) { + timestampVector->set( + vectorIndex, + convertToVeloxTimestamp(value->get_immutable_view())); + } else if (value->is()) { + timestampVector->set( + vectorIndex, + convertToVeloxTimestamp(value->get_immutable_view())); + } else { + VELOX_FAIL("Unsupported timestamp type"); + } + break; + } case ColumnType::Array: { auto arrayVector = std::dynamic_pointer_cast(vector); std::string jsonString; diff --git a/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h b/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h index bdbf3fd8a58c..5e8d6ea4e663 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h +++ b/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h @@ -18,8 +18,8 @@ #include -#include "connectors/clp/search_lib/BaseClpCursor.h" #include "ffi/ir_stream/Deserializer.hpp" +#include "velox/connectors/clp/search_lib/BaseClpCursor.h" #include "velox/vector/FlatVector.h" #include "velox/vector/LazyVector.h" @@ -28,26 +28,28 @@ namespace facebook::velox::connector::clp::search_lib { class ClpIrVectorLoader : public VectorLoader { public: ClpIrVectorLoader( - bool isResolved, - ColumnType nodeType, - ::clp::ffi::SchemaTree::Node::id_t nodeId, const std::shared_ptr< const std::vector>>& - filteredLogEvents) - : isResolved_(isResolved), - nodeType_(nodeType), - nodeId_(nodeId), - filteredLogEvents_(filteredLogEvents) {} + filteredLogEvents, + bool isResolved, + std::vector<::clp::ffi::SchemaTree::Node::id_t> nodeIds, + std::string_view nodeName, + ColumnType nodeType) + : filteredLogEvents_(filteredLogEvents), + isResolved_(isResolved), + nodeIds_(std::move(nodeIds)), + nodeName_(nodeName), + nodeType_(nodeType) {} private: simdjson::ondemand::parser arrayParser_; - - bool isResolved_; - ColumnType nodeType_; - ::clp::ffi::SchemaTree::Node::id_t nodeId_; std::shared_ptr< const std::vector>> filteredLogEvents_; + bool isResolved_{}; + std::vector<::clp::ffi::SchemaTree::Node::id_t> nodeIds_; + std::string nodeName_; + ColumnType nodeType_; void loadInternal( RowSet rows, diff --git a/velox/connectors/clp/tests/ClpConnectorTest.cpp b/velox/connectors/clp/tests/ClpConnectorTest.cpp index 3bec10d9ef5d..dc12b270eff2 100644 --- a/velox/connectors/clp/tests/ClpConnectorTest.cpp +++ b/velox/connectors/clp/tests/ClpConnectorTest.cpp @@ -257,49 +257,13 @@ TEST_F(ClpConnectorTest, test2NoPushdown) { })}); test::assertEqualVectors(expected, output); - // IR stream currently does not support TIMESTAMP type; will merge into the - // plan above as soon as this feature is implemented - auto irPlan = - PlanBuilder(pool_.get()) - .startTableScan() - .outputType( - ROW({"event"}, - {ROW( - {"type", "subtype", "severity", "tags"}, - {VARCHAR(), VARCHAR(), VARCHAR(), ARRAY(VARCHAR())})})) - .tableHandle( - std::make_shared(kClpConnectorId, "test_2")) - .assignments( - {{"event", - std::make_shared( - "event", - "event", - ROW({"type", "subtype", "severity", "tags"}, - {VARCHAR(), VARCHAR(), VARCHAR(), ARRAY(VARCHAR())}))}}) - .endTableScan() - .filter( - "event.severity IN ('WARNING', 'ERROR') AND " - "((event.type = 'network' AND event.subtype = 'connection') OR " - "(event.type = 'storage' AND event.subtype LIKE 'disk_usage%'))") - .planNode(); auto irOutput = getResults( - irPlan, + plan, {makeClpSplit( getExampleFilePath("test_2_ir.clps"), ClpConnectorSplit::SplitType::kIr, kqlQuery)}); - auto irExpected = makeRowVector( - {// event - makeRowVector( - {// event.type - makeFlatVector({"storage"}), - // event.subtype - makeFlatVector({"disk_usage"}), - // event.severity - makeFlatVector({"WARNING"}), - // event.tags - makeArrayVector({{"\"backup\"", "\"daily\""}})})}); - test::assertEqualVectors(irExpected, irOutput); + test::assertEqualVectors(expected, irOutput); } TEST_F(ClpConnectorTest, test2Pushdown) { @@ -350,45 +314,13 @@ TEST_F(ClpConnectorTest, test2Pushdown) { })}); test::assertEqualVectors(expected, output); - // IR stream currently does not support TIMESTAMP type; will merge into the - // plan above as soon as this feature is implemented - auto irPlan = - PlanBuilder(pool_.get()) - .startTableScan() - .outputType( - ROW({"event"}, - {ROW( - {"type", "subtype", "severity", "tags"}, - {VARCHAR(), VARCHAR(), VARCHAR(), ARRAY(VARCHAR())})})) - .tableHandle( - std::make_shared(kClpConnectorId, "test_2")) - .assignments( - {{"event", - std::make_shared( - "event", - "event", - ROW({"type", "subtype", "severity", "tags"}, - {VARCHAR(), VARCHAR(), VARCHAR(), ARRAY(VARCHAR())}))}}) - .endTableScan() - .planNode(); auto irOutput = getResults( - irPlan, + plan, {makeClpSplit( getExampleFilePath("test_2_ir.clps"), ClpConnectorSplit::SplitType::kIr, kqlQuery)}); - auto irExpected = makeRowVector( - {// event - makeRowVector({// event.type - makeFlatVector({"storage"}), - // event.subtype - makeFlatVector({"disk_usage"}), - // event.severity - makeFlatVector({"WARNING"}), - // event.tags - makeArrayVector( - {{"\"filesystem\"", "\"monitoring\""}})})}); - test::assertEqualVectors(irExpected, irOutput); + test::assertEqualVectors(expected, irOutput); } TEST_F(ClpConnectorTest, test2Hybrid) { @@ -442,6 +374,14 @@ TEST_F(ClpConnectorTest, test2Hybrid) { }); test::assertEqualVectors(expected, output); + + auto irOutput = getResults( + plan, + {makeClpSplit( + getExampleFilePath("test_2_ir.clps"), + ClpConnectorSplit::SplitType::kIr, + kqlQuery)}); + test::assertEqualVectors(expected, irOutput); } TEST_F(ClpConnectorTest, test3TimestampMarshalling) { @@ -475,6 +415,64 @@ TEST_F(ClpConnectorTest, test3TimestampMarshalling) { test::assertEqualVectors(expected, output); } +TEST_F(ClpConnectorTest, test4IrTimestampNoPushdown) { + const std::shared_ptr kqlQuery = nullptr; + auto plan = PlanBuilder(pool_.get()) + .startTableScan() + .outputType(ROW({"timestamp"}, {TIMESTAMP()})) + .tableHandle(std::make_shared( + kClpConnectorId, "test_4")) + .assignments( + {{"timestamp", + std::make_shared( + "timestamp", "timestamp", TIMESTAMP())}}) + .endTableScan() + .filter("\"timestamp\" < timestamp '2025-08-24 02:36:45'") + .planNode(); + + auto output = getResults( + plan, + {makeClpSplit( + getExampleFilePath("test_4_ir.clps"), + ClpConnectorSplit::SplitType::kIr, + kqlQuery)}); + auto expected = makeRowVector({ + // timestamp + makeFlatVector( + {Timestamp(kTestTimestampSeconds, kTestTimestampNanoseconds)}), + }); + test::assertEqualVectors(expected, output); +} + +TEST_F(ClpConnectorTest, test4IrTimestampPushdown) { + const std::shared_ptr kqlQuery = + std::make_shared("(timestamp < 1756003005000000)"); + auto plan = PlanBuilder(pool_.get()) + .startTableScan() + .outputType(ROW({"timestamp"}, {TIMESTAMP()})) + .tableHandle(std::make_shared( + kClpConnectorId, "test_4")) + .assignments( + {{"timestamp", + std::make_shared( + "timestamp", "timestamp", TIMESTAMP())}}) + .endTableScan() + .planNode(); + + auto output = getResults( + plan, + {makeClpSplit( + getExampleFilePath("test_4_ir.clps"), + ClpConnectorSplit::SplitType::kIr, + kqlQuery)}); + auto expected = makeRowVector({ + // timestamp + makeFlatVector( + {Timestamp(kTestTimestampSeconds, kTestTimestampNanoseconds)}), + }); + test::assertEqualVectors(expected, output); +} + } // namespace int main(int argc, char** argv) { diff --git a/velox/connectors/clp/tests/examples/test_1_ir.clps b/velox/connectors/clp/tests/examples/test_1_ir.clps index 182de479b49c..e09efcfb870b 100644 Binary files a/velox/connectors/clp/tests/examples/test_1_ir.clps and b/velox/connectors/clp/tests/examples/test_1_ir.clps differ diff --git a/velox/connectors/clp/tests/examples/test_2_ir.clps b/velox/connectors/clp/tests/examples/test_2_ir.clps index 06c3691ef34e..983e7b1144dc 100644 Binary files a/velox/connectors/clp/tests/examples/test_2_ir.clps and b/velox/connectors/clp/tests/examples/test_2_ir.clps differ diff --git a/velox/connectors/clp/tests/examples/test_3_ir.clps b/velox/connectors/clp/tests/examples/test_3_ir.clps new file mode 100644 index 000000000000..7f185fb9b3ea Binary files /dev/null and b/velox/connectors/clp/tests/examples/test_3_ir.clps differ diff --git a/velox/connectors/clp/tests/examples/test_4.ndjson b/velox/connectors/clp/tests/examples/test_4.ndjson new file mode 100644 index 000000000000..950b7e7da58d --- /dev/null +++ b/velox/connectors/clp/tests/examples/test_4.ndjson @@ -0,0 +1,3 @@ +{"timestamp": "2025-04-30T08:50:05.000Z"} +{"timestamp": 1746003005000000} +{"timestamp": 1766003005000000} diff --git a/velox/connectors/clp/tests/examples/test_4_ir.clps b/velox/connectors/clp/tests/examples/test_4_ir.clps new file mode 100644 index 000000000000..1494cbc1c635 Binary files /dev/null and b/velox/connectors/clp/tests/examples/test_4_ir.clps differ diff --git a/velox/docs/develop/connectors.rst b/velox/docs/develop/connectors.rst index e78b2615acc8..22886134e3b2 100644 --- a/velox/docs/develop/connectors.rst +++ b/velox/docs/develop/connectors.rst @@ -135,6 +135,14 @@ ClpConnectorSplit if it is stored on a local file system, or the complete (or partial) URL of the split if it is stored on S3. In the latter case, when only a partial URL is provided, ``ClpS3AuthProviderBase`` provides a hook in ``ClpDataSource`` to assist in constructing the full URL. Refer to :ref:`ClpS3AuthProviderBase` for details. +``ClpConnectorSplit`` also includes a ``type`` property that specifies whether the split is an archive or an IR +(Internal Representation) stream. + +BaseClpCursor +~~~~~~~~~~~~~ +``BaseClpCursor`` is responsible for preparing pushdown operations, loading splits, filtering data, and returning +results. Each split type (archive or IR stream) has its own corresponding subclass (``ClpArchiveCursor`` and +``ClpIrCursor``). ClpDataSource ~~~~~~~~~~~~~ @@ -146,10 +154,11 @@ each output column, accessing its handle to get its type and original name. For traverses the nested structure to process each field; for non-row types, it directly maps the Velox column type to a CLP column type. -When a split is added, a ``ClpCursor`` is created with the split path and input source. The query is parsed -and simplified into an AST. On ``next``, the cursor finds matching row indices and, if any exist, -``ClpDataSource`` recursively creates a row vector composed of lazy vectors, which use CLP column readers to -decode and load data as needed during execution. +When a split is added, a ``BaseClpCursor`` instance is created with the split path and input source (which +may be either a ``ClpArchiveCursor`` or a ``ClpIrCursor``). The query is parsed and simplified into an AST. +On ``next``, the cursor finds matching row indices and, if any exist, ``ClpDataSource`` recursively creates +a row vector composed of lazy vectors, which use CLP column readers to decode and load data as needed during +execution. .. _ClpS3AuthProviderBase: