-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add timestamp type support for kv-ir splits. #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 42 commits
5af7ec3
fe8a130
cc82dd7
65d3bd1
d45baf0
7f40b7e
c0cac3d
c7bd83b
27ba897
19bad61
d3fafde
f53c6c7
e466f68
8be8eb7
d6ba0da
221bec6
679d338
6e1462d
f8f75c3
1c091c7
8ad9b9c
7f25210
28e9a6f
595e74b
1b12402
5db1cf9
e71a957
cc82c62
3ca7361
184f9de
2102275
f9e1409
4ef6e96
e576db7
fd2b308
82778cc
b257938
10e222a
1928748
1982c3b
947fded
03d251d
da66e99
ae97766
a6a9e49
4c3e4e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| /* | ||
| * Copyright (c) Facebook, Inc. and its affiliates. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| #include "velox/type/Timestamp.h" | ||
|
|
||
| namespace facebook::velox::connector::clp::search_lib { | ||
|
|
||
| enum class InputTimestampPrecision : uint8_t { | ||
| Seconds, | ||
| Milliseconds, | ||
| Microseconds, | ||
| Nanoseconds | ||
| }; | ||
|
|
||
| /// Estimates the precision of an epoch timestamp as seconds, milliseconds, | ||
| /// microseconds, or nanoseconds. | ||
| /// | ||
| /// This heuristic relies on the fact that 1 year of epoch nanoseconds is | ||
| /// approximately 1000 years of epoch microseconds and so on. This heuristic | ||
| /// can be unreliable for timestamps sufficiently close to the epoch, but | ||
| /// should otherwise be accurate for the next 1000 years. | ||
| /// | ||
| /// Note: Future versions of the clp-s archive format will adopt a | ||
| /// nanosecond-precision integer timestamp format (as opposed to the current | ||
| /// format which allows other precisions), at which point we can remove this | ||
| /// heuristic. | ||
| /// | ||
| /// @param timestamp | ||
| /// @return the estimated timestamp precision | ||
| template <typename T> | ||
| auto estimatePrecision(T timestamp) -> InputTimestampPrecision { | ||
| constexpr int64_t kEpochMilliseconds1971{31536000000}; | ||
| constexpr int64_t kEpochMicroseconds1971{31536000000000}; | ||
| constexpr int64_t kEpochNanoseconds1971{31536000000000000}; | ||
| auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp; | ||
|
|
||
| if (absTimestamp > kEpochNanoseconds1971) { | ||
| return InputTimestampPrecision::Nanoseconds; | ||
| } else if (absTimestamp > kEpochMicroseconds1971) { | ||
| return InputTimestampPrecision::Microseconds; | ||
| } else if (absTimestamp > kEpochMilliseconds1971) { | ||
| return InputTimestampPrecision::Milliseconds; | ||
| } else { | ||
| return InputTimestampPrecision::Seconds; | ||
| } | ||
| } | ||
|
|
||
| /// Converts a double value into a Velox timestamp. | ||
| /// | ||
| /// @param timestamp the input timestamp as a double | ||
| /// @return the corresponding Velox timestamp | ||
| inline auto convertToVeloxTimestamp(double timestamp) -> Timestamp { | ||
| switch (estimatePrecision(timestamp)) { | ||
| case InputTimestampPrecision::Nanoseconds: | ||
| timestamp /= Timestamp::kNanosInSecond; | ||
| break; | ||
| case InputTimestampPrecision::Microseconds: | ||
| timestamp /= Timestamp::kMicrosecondsInSecond; | ||
| break; | ||
| case InputTimestampPrecision::Milliseconds: | ||
| timestamp /= Timestamp::kMillisecondsInSecond; | ||
| break; | ||
| case InputTimestampPrecision::Seconds: | ||
| break; | ||
| } | ||
| double seconds{std::floor(timestamp)}; | ||
| double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond}; | ||
| return Timestamp( | ||
| static_cast<int64_t>(seconds), static_cast<uint64_t>(nanoseconds)); | ||
| } | ||
|
|
||
| /// Converts an integer value into a Velox timestamp. | ||
| /// | ||
| /// @param timestamp the input timestamp as an integer | ||
| /// @return the corresponding Velox timestamp | ||
| inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp { | ||
| int64_t precisionDifference{Timestamp::kNanosInSecond}; | ||
| switch (estimatePrecision(timestamp)) { | ||
| case InputTimestampPrecision::Nanoseconds: | ||
| break; | ||
| case InputTimestampPrecision::Microseconds: | ||
| precisionDifference = | ||
| Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond; | ||
| break; | ||
| case InputTimestampPrecision::Milliseconds: | ||
| precisionDifference = | ||
| Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond; | ||
| break; | ||
| case InputTimestampPrecision::Seconds: | ||
| precisionDifference = | ||
| Timestamp::kNanosInSecond / Timestamp::kNanosInSecond; | ||
| break; | ||
| } | ||
| int64_t seconds{timestamp / precisionDifference}; | ||
| int64_t nanoseconds{ | ||
| (timestamp % precisionDifference) * | ||
| (Timestamp::kNanosInSecond / precisionDifference)}; | ||
| if (nanoseconds < 0) { | ||
| seconds -= 1; | ||
| nanoseconds += Timestamp::kNanosInSecond; | ||
| } | ||
| return Timestamp(seconds, static_cast<uint64_t>(nanoseconds)); | ||
| } | ||
|
|
||
| } // namespace facebook::velox::connector::clp::search_lib | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
| #include "clp_s/InputConfig.hpp" | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "ffi/ir_stream/search/QueryHandler.hpp" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "velox/connectors/clp/search_lib/ir/ClpIrCursor.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -59,10 +60,10 @@ VectorPtr ClpIrCursor::createVector( | |||||||||||||||||||||||||||||||||||||||||||||||||
| const TypePtr& vectorType, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| size_t vectorSize) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| VELOX_CHECK_EQ( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnIdxNodeIdMap_.size(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnIdxNodeIdsMap_.size(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| outputColumns_.size(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "Projected columns size {} does not match fields size {}", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnIdxNodeIdMap_.size(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "Resolved node-id map size ({}) must not exceed projected columns ({})", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnIdxNodeIdsMap_.size(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| outputColumns_.size()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return createVectorHelper(pool, vectorType, vectorSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -135,7 +136,8 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const { | |||||||||||||||||||||||||||||||||||||||||||||||||
| case ColumnType::Timestamp: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: IR timestamp support pending; constrain to Unknown to avoid | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // mismatched projections. | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| literalType = search::ast::LiteralType::EpochDateT; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| literalType = search::ast::LiteralType::FloatT | | ||||||||||||||||||||||||||||||||||||||||||||||||||
| search::ast::LiteralType::IntegerT; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||||
anlowee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| literalType = search::ast::LiteralType::UnknownT; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -189,22 +191,24 @@ VectorPtr ClpIrCursor::createVectorHelper( | |||||||||||||||||||||||||||||||||||||||||||||||||
| readerIndex_, outputColumns_.size(), "Reader index out of bounds"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auto projectedColumn = outputColumns_[readerIndex_]; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auto projectedColumnType = projectedColumn.type; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auto it = projectedColumnIdxNodeIdMap_.find(readerIndex_); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| bool isResolved = it != projectedColumnIdxNodeIdMap_.end(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ::clp::ffi::SchemaTree::Node::id_t projectedColumnNodeId; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
190
to
+191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick Avoid a copy of the projected column. Bind by const reference; Apply: - auto projectedColumn = outputColumns_[readerIndex_];
+ const auto& projectedColumn = outputColumns_[readerIndex_];📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||
| std::vector<::clp::ffi::SchemaTree::Node::id_t> projectedColumnNodeIds{}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| bool isResolved = | ||||||||||||||||||||||||||||||||||||||||||||||||||
| it != projectedColumnIdxNodeIdsMap_.end() && !it->second.empty(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (isResolved) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnNodeId = it->second; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnNodeIds = it->second; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| readerIndex_++; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return std::make_shared<LazyVector>( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pool, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| vectorType, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| vectorSize, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| std::make_unique<ClpIrVectorLoader>( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| isResolved, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnType, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnNodeId, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| std::move(projectedColumnNodeIds), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumn.name, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| projectedColumnType), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| std::move(vector)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
203
to
209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick Move of node IDs into loader — LGTM; consider avoiding the prior copy. Passing Option if safe: - if (isResolved) {
- projectedColumnNodeIds = it->second;
- }
+ if (isResolved) {
+ projectedColumnNodeIds = std::move(it->second); // only if map isn’t used later
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Precision estimation heuristic may misclassify timestamps near epoch
The heuristic for determining timestamp precision relies on magnitude thresholds that can fail for timestamps close to the Unix epoch (1970). For example, a timestamp of 1000 seconds since epoch (16 minutes 40 seconds after midnight on January 1, 1970) would be misclassified as milliseconds since it's less than
kEpochMilliseconds1971.This could lead to incorrect timestamp conversions, especially for historical data or test scenarios using epoch-relative timestamps.
Consider adding explicit precision metadata or using a more robust heuristic that accounts for the current time range. For example:
template <typename T> auto estimatePrecision(T timestamp) -> InputTimestampPrecision { constexpr int64_t kEpochMilliseconds1971{31536000000}; constexpr int64_t kEpochMicroseconds1971{31536000000000}; constexpr int64_t kEpochNanoseconds1971{31536000000000000}; + // Current time bounds for validation (approximate) + constexpr int64_t kCurrentEpochSeconds{1700000000}; // ~2023 + constexpr int64_t kFutureEpochSeconds{2000000000}; // ~2033 + auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp; + // Additional check: if value is reasonable as seconds (1970-2033 range) + if (absTimestamp > kCurrentEpochSeconds && absTimestamp < kFutureEpochSeconds) { + return InputTimestampPrecision::Seconds; + } + if (absTimestamp > kEpochNanoseconds1971) { return InputTimestampPrecision::Nanoseconds; } else if (absTimestamp > kEpochMicroseconds1971) { return InputTimestampPrecision::Microseconds; } else if (absTimestamp > kEpochMilliseconds1971) { return InputTimestampPrecision::Milliseconds; } else { return InputTimestampPrecision::Seconds; } }