Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
5af7ec3
Extract ClpCursor abstract class and move the original ClpCursor impl…
anlowee Aug 19, 2025
fe8a130
Merge the code
anlowee Aug 19, 2025
cc82dd7
Rename the extracrted ClpCursor to BaseClpCursor, and add the skeleto…
anlowee Aug 19, 2025
65d3bd1
WIP: implement the ClpIrCursor, one todo is to marshal the row in Clp…
anlowee Aug 19, 2025
d45baf0
Remove the IR related code and add a subpackage to store archive sear…
anlowee Aug 20, 2025
7f40b7e
Address coderabbitai comments
anlowee Aug 20, 2025
c0cac3d
Address comments
anlowee Aug 20, 2025
c7bd83b
Update from part1 branch
anlowee Aug 27, 2025
27ba897
WIP
anlowee Sep 2, 2025
19bad61
Move the current VectorLoader to Archive-specific
anlowee Sep 2, 2025
d3fafde
Merge branch 'xwei/ir-integration-phase2-part1' into xwei/ir-integrat…
anlowee Sep 2, 2025
f53c6c7
Rename and WIP
anlowee Sep 3, 2025
e466f68
Put more archive specific thing into Archive code
anlowee Sep 3, 2025
8be8eb7
Merge branch 'xwei/ir-integration-phase2-part1' into xwei/ir-integrat…
anlowee Sep 3, 2025
d6ba0da
WIP
anlowee Sep 3, 2025
221bec6
WIP
anlowee Sep 3, 2025
679d338
Merge branch 'presto-0.293-clp-connector' into xwei/ir-integration-ph…
anlowee Sep 3, 2025
6e1462d
Merge branch 'xwei/ir-integration-phase2-part1' into xwei/ir-integrat…
anlowee Sep 3, 2025
f8f75c3
Get it work
anlowee Sep 4, 2025
1c091c7
Code clean up
anlowee Sep 4, 2025
8ad9b9c
Merge branch 'xwei/ir-integration-phase2-part1' into xwei/ir-integrat…
anlowee Sep 4, 2025
7f25210
Address coderabbitai comments
anlowee Sep 4, 2025
28e9a6f
Merge branch 'xwei/ir-integration-phase2-part1' into xwei/ir-integrat…
anlowee Sep 4, 2025
595e74b
Minor fix
anlowee Sep 4, 2025
1b12402
Fix unitest
anlowee Sep 4, 2025
5db1cf9
Fix building issues
anlowee Sep 5, 2025
e71a957
Address comments
anlowee Sep 5, 2025
cc82c62
Merge branch 'xwei/ir-integration-phase2-part1' into xwei/ir-integrat…
anlowee Sep 5, 2025
3ca7361
Try out zhihao fix
anlowee Sep 5, 2025
184f9de
Apply Zhihao fix
anlowee Sep 5, 2025
2102275
Merge branch 'presto-0.293-clp-connector' into xwei/ir-integration-ph…
anlowee Sep 5, 2025
f9e1409
Address some coderabbitai comments after merging the first PR
anlowee Sep 5, 2025
4ef6e96
Address comments
anlowee Sep 9, 2025
e576db7
Merge remote-tracking branch 'origin/presto-0.293-clp-connector' into…
anlowee Sep 9, 2025
fd2b308
WIP timestamp
anlowee Sep 9, 2025
82778cc
WIP
anlowee Sep 10, 2025
b257938
Fix the unit tests
anlowee Sep 15, 2025
10e222a
Add string type timestamp logevent in test ir file
anlowee Sep 15, 2025
1928748
Add docs
anlowee Sep 15, 2025
1982c3b
Address coderabbitai comments
anlowee Sep 16, 2025
947fded
Address coderabbitai comments
anlowee Sep 16, 2025
03d251d
Address comments
anlowee Sep 22, 2025
da66e99
Address comment
anlowee Sep 25, 2025
ae97766
Address comment
anlowee Sep 25, 2025
a6a9e49
Merge branch 'presto-0.293-clp-connector' into xwei/ir-integration-ph…
anlowee Sep 26, 2025
4c3e4e8
Apply suggestions from code review
anlowee Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion velox/connectors/clp/search_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ velox_add_library(
ClpPackageS3AuthProvider.cpp
ClpPackageS3AuthProvider.h
ClpS3AuthProviderBase.cpp
ClpS3AuthProviderBase.h)
ClpS3AuthProviderBase.h
ClpTimestampsUtils.h)

add_subdirectory(archive)
add_subdirectory(ir)
Expand Down
120 changes: 120 additions & 0 deletions velox/connectors/clp/search_lib/ClpTimestampsUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/type/Timestamp.h"

namespace facebook::velox::connector::clp::search_lib {

enum class InputTimestampPrecision : uint8_t {
Seconds,
Milliseconds,
Microseconds,
Nanoseconds
};

/// Estimates the precision of an epoch timestamp as seconds, milliseconds,
/// microseconds, or nanoseconds.
///
/// This heuristic relies on the fact that 1 year of epoch nanoseconds is
/// approximately 1000 years of epoch microseconds and so on. This heuristic
/// can be unreliable for timestamps sufficiently close to the epoch, but
/// should otherwise be accurate for the next 1000 years.
///
/// Note: Future versions of the clp-s archive format will adopt a
/// nanosecond-precision integer timestamp format (as opposed to the current
/// format which allows other precisions), at which point we can remove this
/// heuristic.
///
/// @param timestamp
/// @return the estimated timestamp precision
template <typename T>
auto estimatePrecision(T timestamp) -> InputTimestampPrecision {
constexpr int64_t kEpochMilliseconds1971{31536000000};
constexpr int64_t kEpochMicroseconds1971{31536000000000};
constexpr int64_t kEpochNanoseconds1971{31536000000000000};
auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp;

if (absTimestamp > kEpochNanoseconds1971) {
return InputTimestampPrecision::Nanoseconds;
} else if (absTimestamp > kEpochMicroseconds1971) {
return InputTimestampPrecision::Microseconds;
} else if (absTimestamp > kEpochMilliseconds1971) {
return InputTimestampPrecision::Milliseconds;
} else {
return InputTimestampPrecision::Seconds;
}
}
Comment on lines +46 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Precision estimation heuristic may misclassify timestamps near epoch

The heuristic for determining timestamp precision relies on magnitude thresholds that can fail for timestamps close to the Unix epoch (1970). For example, a timestamp of 1000 seconds since epoch (16 minutes 40 seconds after midnight on January 1, 1970) would be misclassified as milliseconds since it's less than kEpochMilliseconds1971.

This could lead to incorrect timestamp conversions, especially for historical data or test scenarios using epoch-relative timestamps.

Consider adding explicit precision metadata or using a more robust heuristic that accounts for the current time range. For example:

 template <typename T>
 auto estimatePrecision(T timestamp) -> InputTimestampPrecision {
   constexpr int64_t kEpochMilliseconds1971{31536000000};
   constexpr int64_t kEpochMicroseconds1971{31536000000000};
   constexpr int64_t kEpochNanoseconds1971{31536000000000000};
+  // Current time bounds for validation (approximate)
+  constexpr int64_t kCurrentEpochSeconds{1700000000};  // ~2023
+  constexpr int64_t kFutureEpochSeconds{2000000000};   // ~2033
+  
   auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp;

+  // Additional check: if value is reasonable as seconds (1970-2033 range)
+  if (absTimestamp > kCurrentEpochSeconds && absTimestamp < kFutureEpochSeconds) {
+    return InputTimestampPrecision::Seconds;
+  }
+
   if (absTimestamp > kEpochNanoseconds1971) {
     return InputTimestampPrecision::Nanoseconds;
   } else if (absTimestamp > kEpochMicroseconds1971) {
     return InputTimestampPrecision::Microseconds;
   } else if (absTimestamp > kEpochMilliseconds1971) {
     return InputTimestampPrecision::Milliseconds;
   } else {
     return InputTimestampPrecision::Seconds;
   }
 }

Committable suggestion skipped: line range outside the PR's diff.


/// Converts a double value into a Velox timestamp.
///
/// @param timestamp the input timestamp as a double
/// @return the corresponding Velox timestamp
inline auto convertToVeloxTimestamp(double timestamp) -> Timestamp {
switch (estimatePrecision(timestamp)) {
case InputTimestampPrecision::Nanoseconds:
timestamp /= Timestamp::kNanosInSecond;
break;
case InputTimestampPrecision::Microseconds:
timestamp /= Timestamp::kMicrosecondsInSecond;
break;
case InputTimestampPrecision::Milliseconds:
timestamp /= Timestamp::kMillisecondsInSecond;
break;
case InputTimestampPrecision::Seconds:
break;
}
double seconds{std::floor(timestamp)};
double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond};
return Timestamp(
static_cast<int64_t>(seconds), static_cast<uint64_t>(nanoseconds));
}

/// Converts an integer value into a Velox timestamp.
///
/// @param timestamp the input timestamp as an integer
/// @return the corresponding Velox timestamp
inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp {
int64_t precisionDifference{Timestamp::kNanosInSecond};
switch (estimatePrecision(timestamp)) {
case InputTimestampPrecision::Nanoseconds:
break;
case InputTimestampPrecision::Microseconds:
precisionDifference =
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond;
break;
case InputTimestampPrecision::Milliseconds:
precisionDifference =
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond;
break;
case InputTimestampPrecision::Seconds:
precisionDifference =
Timestamp::kNanosInSecond / Timestamp::kNanosInSecond;
break;
}
int64_t seconds{timestamp / precisionDifference};
int64_t nanoseconds{
(timestamp % precisionDifference) *
(Timestamp::kNanosInSecond / precisionDifference)};
if (nanoseconds < 0) {
seconds -= 1;
nanoseconds += Timestamp::kNanosInSecond;
}
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
}

} // namespace facebook::velox::connector::clp::search_lib
Original file line number Diff line number Diff line change
Expand Up @@ -21,106 +21,14 @@
#include "clp_s/ColumnReader.hpp"
#include "clp_s/SchemaTree.hpp"
#include "velox/connectors/clp/search_lib/BaseClpCursor.h"
#include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h"
#include "velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.h"
#include "velox/type/Timestamp.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"

namespace facebook::velox::connector::clp::search_lib {

namespace {

enum class TimestampPrecision : uint8_t {
Seconds,
Milliseconds,
Microseconds,
Nanoseconds
};

/// Estimates the precision of an epoch timestamp as seconds, milliseconds,
/// microseconds, or nanoseconds.
///
/// This heuristic relies on the fact that 1 year of epoch nanoseconds is
/// approximately 1000 years of epoch microseconds and so on. This heuristic
/// can be unreliable for timestamps sufficiently close to the epoch, but
/// should otherwise be accurate for the next 1000 years.
///
/// Note: Future versions of the clp-s archive format will adopt a
/// nanosecond-precision integer timestamp format (as opposed to the current
/// format which allows other precisions), at which point we can remove this
/// heuristic.
///
/// @param timestamp
/// @return the estimated timestamp precision
template <typename T>
auto estimatePrecision(T timestamp) -> TimestampPrecision {
constexpr int64_t kEpochMilliseconds1971{31536000000};
constexpr int64_t kEpochMicroseconds1971{31536000000000};
constexpr int64_t kEpochNanoseconds1971{31536000000000000};
auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp;

if (absTimestamp > kEpochNanoseconds1971) {
return TimestampPrecision::Nanoseconds;
} else if (absTimestamp > kEpochMicroseconds1971) {
return TimestampPrecision::Microseconds;
} else if (absTimestamp > kEpochMilliseconds1971) {
return TimestampPrecision::Milliseconds;
} else {
return TimestampPrecision::Seconds;
}
}

auto convertToVeloxTimestamp(double timestamp) -> Timestamp {
switch (estimatePrecision(timestamp)) {
case TimestampPrecision::Nanoseconds:
timestamp /= Timestamp::kNanosInSecond;
break;
case TimestampPrecision::Microseconds:
timestamp /= Timestamp::kMicrosecondsInSecond;
break;
case TimestampPrecision::Milliseconds:
timestamp /= Timestamp::kMillisecondsInSecond;
break;
case TimestampPrecision::Seconds:
break;
}
double seconds{std::floor(timestamp)};
double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond};
return Timestamp(
static_cast<int64_t>(seconds), static_cast<uint64_t>(nanoseconds));
}

auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp {
int64_t precisionDifference{Timestamp::kNanosInSecond};
switch (estimatePrecision(timestamp)) {
case TimestampPrecision::Nanoseconds:
break;
case TimestampPrecision::Microseconds:
precisionDifference =
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond;
break;
case TimestampPrecision::Milliseconds:
precisionDifference =
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond;
break;
case TimestampPrecision::Seconds:
precisionDifference =
Timestamp::kNanosInSecond / Timestamp::kNanosInSecond;
break;
}
int64_t seconds{timestamp / precisionDifference};
int64_t nanoseconds{
(timestamp % precisionDifference) *
(Timestamp::kNanosInSecond / precisionDifference)};
if (nanoseconds < 0) {
seconds -= 1;
nanoseconds += Timestamp::kNanosInSecond;
}
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
}

} // namespace

ClpArchiveVectorLoader::ClpArchiveVectorLoader(
clp_s::BaseColumnReader* columnReader,
ColumnType nodeType,
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/clp/search_lib/ir/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ velox_link_libraries(
clp_s::clp_dependencies
clp_s::io
clp_s::search
clp_s::search::ast
clp_s::search::kql
velox_vector)
27 changes: 14 additions & 13 deletions velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ VectorPtr ClpIrCursor::createVector(
const TypePtr& vectorType,
size_t vectorSize) {
VELOX_CHECK_EQ(
projectedColumnIdxNodeIdMap_.size(),
projectedColumnIdxNodeIdsMap_.size(),
outputColumns_.size(),
"Projected columns size {} does not match fields size {}",
projectedColumnIdxNodeIdMap_.size(),
"Resolved node-id map size ({}) must not exceed projected columns ({})",
projectedColumnIdxNodeIdsMap_.size(),
outputColumns_.size());
return createVectorHelper(pool, vectorType, vectorSize);
}
Expand Down Expand Up @@ -133,9 +133,8 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const {
search::ast::LiteralType::ClpStringT;
break;
case ColumnType::Timestamp:
// TODO: IR timestamp support pending; constrain to Unknown to avoid
// mismatched projections.
literalType = search::ast::LiteralType::EpochDateT;
literalType = search::ast::LiteralType::FloatT |
search::ast::LiteralType::IntegerT;
break;
default:
literalType = search::ast::LiteralType::UnknownT;
Expand Down Expand Up @@ -189,22 +188,24 @@ VectorPtr ClpIrCursor::createVectorHelper(
readerIndex_, outputColumns_.size(), "Reader index out of bounds");
auto projectedColumn = outputColumns_[readerIndex_];
auto projectedColumnType = projectedColumn.type;
auto it = projectedColumnIdxNodeIdMap_.find(readerIndex_);
bool isResolved = it != projectedColumnIdxNodeIdMap_.end();
::clp::ffi::SchemaTree::Node::id_t projectedColumnNodeId;
auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_);
Comment on lines 190 to +191
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Avoid a copy of the projected column.

Bind by const reference; outputColumns_ outlives this scope.

Apply:

-  auto projectedColumn = outputColumns_[readerIndex_];
+  const auto& projectedColumn = outputColumns_[readerIndex_];
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto projectedColumn = outputColumns_[readerIndex_];
auto projectedColumnType = projectedColumn.type;
auto it = projectedColumnIdxNodeIdMap_.find(readerIndex_);
bool isResolved = it != projectedColumnIdxNodeIdMap_.end();
::clp::ffi::SchemaTree::Node::id_t projectedColumnNodeId;
auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_);
const auto& projectedColumn = outputColumns_[readerIndex_];
auto projectedColumnType = projectedColumn.type;
auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_);
🤖 Prompt for AI Agents
In velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp around lines 192-194, the
code currently copies outputColumns_[readerIndex_] into projectedColumn which is
unnecessary because outputColumns_ outlives this scope; change the binding to a
const reference (e.g., const auto& projectedColumn =
outputColumns_[readerIndex_]) and also make projectedColumnType a reference if
you need to avoid copying its underlying type (e.g., const auto&
projectedColumnType = projectedColumn.type) so no unnecessary copies occur.

std::vector<::clp::ffi::SchemaTree::Node::id_t> projectedColumnNodeIds{};
bool isResolved =
it != projectedColumnIdxNodeIdsMap_.end() && !it->second.empty();
if (isResolved) {
projectedColumnNodeId = it->second;
projectedColumnNodeIds = it->second;
}
readerIndex_++;
return std::make_shared<LazyVector>(
pool,
vectorType,
vectorSize,
std::make_unique<ClpIrVectorLoader>(
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(),
isResolved,
projectedColumnType,
projectedColumnNodeId,
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()),
std::move(projectedColumnNodeIds),
projectedColumn.name,
projectedColumnType),
std::move(vector));
Comment on lines 203 to 209
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Move of node IDs into loader — LGTM; consider avoiding the prior copy.

Passing std::move(projectedColumnNodeIds) matches the constructor. If the map entry isn’t needed later, you can move directly from the map to skip the intermediate copy.

Option if safe:

-  if (isResolved) {
-    projectedColumnNodeIds = it->second;
-  }
+  if (isResolved) {
+    projectedColumnNodeIds = std::move(it->second); // only if map isn’t used later
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::make_unique<ClpIrVectorLoader>(
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(),
isResolved,
projectedColumnType,
projectedColumnNodeId,
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()),
std::move(projectedColumnNodeIds),
projectedColumn.name,
projectedColumnType),
std::move(vector));
const auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_);
bool isResolved = it != projectedColumnIdxNodeIdsMap_.end();
std::vector<::clp::ffi::SchemaTree::Node::id_t> projectedColumnNodeIds;
if (isResolved) {
projectedColumnNodeIds = std::move(it->second); // only if map isn’t used later
}
std::make_unique<ClpIrVectorLoader>(
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(),
isResolved,
std::move(projectedColumnNodeIds),
projectedColumn.name,
projectedColumnType),
std::move(vector));
🤖 Prompt for AI Agents
In velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp around lines 206–212, the
code currently passes std::move(projectedColumnNodeIds) into the
ClpIrVectorLoader but creates projectedColumnNodeIds as an intermediate copy
beforehand; instead avoid that extra copy by moving the map entry directly into
the loader (e.g., std::move(it->second>) where you have the iterator/key) when
constructing the loader, ensure the map entry is not used afterwards, remove the
intermediate projectedColumnNodeIds variable and any of its uses, and verify
iterator validity and const-correctness so the move is safe.

}

Expand Down
15 changes: 11 additions & 4 deletions velox/connectors/clp/search_lib/ir/ClpIrCursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,15 @@ class ClpIrCursor final : public BaseClpCursor {
[[maybe_unused]] std::pair<std::string_view, size_t>
projected_key_and_index)
-> ystdlib::error_handling::Result<void> {
projectedColumnIdxNodeIdMap_.insert(
{projected_key_and_index.second, nodeId});
auto it =
projectedColumnIdxNodeIdsMap_.find(projected_key_and_index.second);
if (it == projectedColumnIdxNodeIdsMap_.end()) {
projectedColumnIdxNodeIdsMap_.insert(
{projected_key_and_index.second,
std::vector<::clp::ffi::SchemaTree::Node::id_t>{nodeId}});
return ystdlib::error_handling::success();
}
it->second.emplace_back(nodeId);
return ystdlib::error_handling::success();
};
using QueryHandlerType = ::clp::ffi::ir_stream::search::QueryHandler<
Expand All @@ -70,8 +77,8 @@ class ClpIrCursor final : public BaseClpCursor {
::clp::ffi::ir_stream::Deserializer<ClpIrUnitHandler, QueryHandlerType>>
irDeserializer_;
std::shared_ptr<::clp::ReaderInterface> irReader_{nullptr};
std::unordered_map<size_t, ::clp::ffi::SchemaTree::Node::id_t>
projectedColumnIdxNodeIdMap_;
std::unordered_map<size_t, std::vector<::clp::ffi::SchemaTree::Node::id_t>>
projectedColumnIdxNodeIdsMap_;
size_t readerIndex_{0};

std::vector<
Expand Down
33 changes: 29 additions & 4 deletions velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/

#include "velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h"

#include "velox/connectors/clp/search_lib/BaseClpCursor.h"
#include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h"

namespace facebook::velox::connector::clp::search_lib {

Expand All @@ -33,12 +35,20 @@ void ClpIrVectorLoader::loadInternal(
auto& logEvent = filteredLogEvents_->at(vectorIndex);
// TODO: also need to support auto-generated keys
auto userGenNodeIdValueMap = logEvent->get_user_gen_node_id_value_pairs();
auto const value_it{userGenNodeIdValueMap.find(nodeId_)};
if (userGenNodeIdValueMap.end() == value_it ||
false == value_it->second.has_value()) {
auto valueIt = userGenNodeIdValueMap.end();
::clp::ffi::SchemaTree::Node::id_t nodeId{};
for (auto const candidateNodeId : nodeIds_) {
valueIt = userGenNodeIdValueMap.find(candidateNodeId);
if (valueIt != userGenNodeIdValueMap.end()) {
nodeId = candidateNodeId;
break;
}
}
if (userGenNodeIdValueMap.end() == valueIt ||
false == valueIt->second.has_value()) {
continue;
}
auto const& value{value_it->second};
auto const& value{valueIt->second};
switch (nodeType_) {
case ColumnType::String: {
auto stringVector = vector->asFlatVector<StringView>();
Expand Down Expand Up @@ -89,6 +99,21 @@ void ClpIrVectorLoader::loadInternal(
vector->setNull(vectorIndex, false);
break;
}
case ColumnType::Timestamp: {
auto timestampVector = vector->asFlatVector<Timestamp>();
if (value->is<double>()) {
timestampVector->set(
vectorIndex,
convertToVeloxTimestamp(value->get_immutable_view<double>()));
} else if (value->is<int64_t>()) {
timestampVector->set(
vectorIndex,
convertToVeloxTimestamp(value->get_immutable_view<int64_t>()));
} else {
VELOX_FAIL("Unsupported timestamp type");
}
break;
}
case ColumnType::Array: {
auto arrayVector = std::dynamic_pointer_cast<ArrayVector>(vector);
std::string jsonString;
Expand Down
Loading
Loading