Skip to content

Commit 739a3aa

Browse files
authored
feat: Add support for searching and marshalling the new Timestamp column type. (#54)
1 parent b288479 commit 739a3aa

7 files changed

Lines changed: 124 additions & 10 deletions

File tree

CMake/resolve_dependency_modules/clp.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ include_guard(GLOBAL)
1616
FetchContent_Declare(
1717
clp
1818
GIT_REPOSITORY https://github.com/y-scope/clp.git
19-
GIT_TAG f82e6114160a6addd4727259906bcf621ac9912c
19+
GIT_TAG a91e5f71f0715d7d6b3ea7c177e1b39b3e6a24a6
2020
)
2121

2222
set(CLP_BUILD_CLP_REGEX_UTILS OFF CACHE BOOL "Build CLP regex utils")

velox/connectors/clp/search_lib/ClpTimestampsUtils.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include "clp_s/Defs.hpp"
1920
#include "velox/type/Timestamp.h"
2021

2122
namespace facebook::velox::connector::clp::search_lib {
@@ -121,4 +122,19 @@ inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp {
121122
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
122123
}
123124

125+
/// Converts a nanosecond precision epochtime_t into a Velox timestamp.
126+
///
127+
/// @param timestamp the input timestamp as an integer
128+
/// @return the corresponding Velox timestamp
129+
inline auto convertNanosecondEpochToVeloxTimestamp(clp_s::epochtime_t timestamp)
130+
-> Timestamp {
131+
int64_t seconds{timestamp / Timestamp::kNanosInSecond};
132+
int64_t nanoseconds{timestamp % Timestamp::kNanosInSecond};
133+
if (nanoseconds < 0) {
134+
seconds -= 1;
135+
nanoseconds += Timestamp::kNanosInSecond;
136+
}
137+
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
138+
}
139+
124140
} // namespace facebook::velox::connector::clp::search_lib

velox/connectors/clp/search_lib/archive/ClpArchiveCursor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
#include <glog/logging.h>
1818

1919
#include "clp_s/ArchiveReader.hpp"
20+
#include "clp_s/SingleFileArchiveDefs.hpp"
2021
#include "clp_s/search/EvaluateTimestampIndex.hpp"
2122
#include "clp_s/search/ast/EmptyExpr.hpp"
2223
#include "clp_s/search/ast/SearchUtils.hpp"
24+
#include "clp_s/search/ast/SetTimestampLiteralPrecision.hpp"
25+
#include "clp_s/search/ast/TimestampLiteral.hpp"
2326
#include "velox/connectors/clp/ClpColumnHandle.h"
2427
#include "velox/connectors/clp/search_lib/archive/ClpArchiveCursor.h"
2528
#include "velox/connectors/clp/search_lib/archive/ClpArchiveJsonStringVectorLoader.h"
@@ -136,6 +139,14 @@ ErrorCode ClpArchiveCursor::loadSplit() {
136139
auto schemaTree = archiveReader_->get_schema_tree();
137140
auto schemaMap = archiveReader_->get_schema_map();
138141

142+
auto const defaultTimestampPrecision{
143+
archiveReader_->has_deprecated_timestamp_format()
144+
? TimestampLiteral::Precision::Milliseconds
145+
: TimestampLiteral::Precision::Nanoseconds};
146+
SetTimestampLiteralPrecision timestampPrecisionPass{
147+
defaultTimestampPrecision};
148+
expr_ = timestampPrecisionPass.run(expr_);
149+
139150
EvaluateTimestampIndex timestampIndex(timestampDict);
140151
if (clp_s::EvaluatedValue::False == timestampIndex.run(expr_)) {
141152
VLOG(2) << "No matching timestamp ranges for query '" << query_ << "'";

velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ void ClpArchiveVectorLoader::populateTimestampData(
7373
case clp_s::NodeType::DictionaryFloat:
7474
case clp_s::NodeType::Integer:
7575
case clp_s::NodeType::DeprecatedDateString:
76+
case clp_s::NodeType::Timestamp:
7677
supportedNodeType = true;
7778
break;
7879
default:
@@ -88,7 +89,13 @@ void ClpArchiveVectorLoader::populateTimestampData(
8889
for (int vectorIndex : rows) {
8990
auto messageIndex = filteredRowIndices_->at(vectorIndex);
9091

91-
if (clp_s::NodeType::Float == Type) {
92+
if (clp_s::NodeType::Timestamp == Type) {
93+
auto reader{static_cast<clp_s::TimestampColumnReader*>(columnReader_)};
94+
vector->set(
95+
vectorIndex,
96+
convertNanosecondEpochToVeloxTimestamp(
97+
reader->get_encoded_time(messageIndex)));
98+
} else if (clp_s::NodeType::Float == Type) {
9299
auto reader = static_cast<clp_s::FloatColumnReader*>(columnReader_);
93100
vector->set(
94101
vectorIndex,
@@ -211,7 +218,12 @@ void ClpArchiveVectorLoader::loadInternal(
211218
}
212219
case ColumnType::Timestamp: {
213220
auto timestampVector = vector->asFlatVector<Timestamp>();
214-
if (nullptr != dynamic_cast<clp_s::Int64ColumnReader*>(columnReader_)) {
221+
if (nullptr !=
222+
dynamic_cast<clp_s::TimestampColumnReader*>(columnReader_)) {
223+
populateTimestampData<clp_s::NodeType::Timestamp>(
224+
rows, timestampVector);
225+
} else if (
226+
nullptr != dynamic_cast<clp_s::Int64ColumnReader*>(columnReader_)) {
215227
populateTimestampData<clp_s::NodeType::Integer>(rows, timestampVector);
216228
} else if (
217229
nullptr !=
@@ -256,10 +268,22 @@ template void ClpArchiveVectorLoader::populateData<std::string>(
256268
RowSet rows,
257269
FlatVector<StringView>* vector);
258270
template void
271+
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::Timestamp>(
272+
RowSet rows,
273+
FlatVector<facebook::velox::Timestamp>* vector);
274+
template void
259275
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::Float>(
260276
RowSet rows,
261277
FlatVector<facebook::velox::Timestamp>* vector);
262278
template void
279+
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::FormattedFloat>(
280+
RowSet rows,
281+
FlatVector<facebook::velox::Timestamp>* vector);
282+
template void
283+
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::DictionaryFloat>(
284+
RowSet rows,
285+
FlatVector<facebook::velox::Timestamp>* vector);
286+
template void
263287
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::Integer>(
264288
RowSet rows,
265289
FlatVector<facebook::velox::Timestamp>* vector);

velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
#include "clp_s/ColumnReader.hpp"
1818
#include "clp_s/InputConfig.hpp"
19+
#include "clp_s/search/ast/SetTimestampLiteralPrecision.hpp"
20+
#include "clp_s/search/ast/TimestampLiteral.hpp"
1921

2022
#include "ffi/ir_stream/search/QueryHandler.hpp"
2123
#include "velox/connectors/clp/ClpColumnHandle.h"
@@ -75,6 +77,10 @@ ErrorCode ClpIrCursor::loadSplit() {
7577
? NetworkAuthOption{.method = AuthMethod::None}
7678
: NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4};
7779

80+
search::ast::SetTimestampLiteralPrecision timestampPrecisionPass{
81+
search::ast::TimestampLiteral::Precision::Milliseconds};
82+
expr_ = timestampPrecisionPass.run(expr_);
83+
7884
auto projections = splitFieldsToNamesAndTypes();
7985
auto queryHandlerResult{QueryHandlerType::create(
8086
projectionResolutionCallback_,

velox/connectors/clp/tests/ClpConnectorTest.cpp

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,10 @@ TEST_F(ClpConnectorTest, test4IrTimestampNoPushdown) {
580580
TEST_F(ClpConnectorTest, test4IrTimestampPushdown) {
581581
// Only the second event meet the condition, the first event is a date string
582582
// which is not supported yet so the value will be NULL.
583+
// This test can not use the `timestamp()` literal, since the integer
584+
// timestamps are in microsecond precision, and we currently assume all IR
585+
// timestamps are millisecond precision when comparing against timestamp
586+
// literals.
583587
const std::shared_ptr<std::string> kqlQuery =
584588
std::make_shared<std::string>("(timestamp < 1756003005000000)");
585589
auto plan =
@@ -661,11 +665,9 @@ TEST_F(ClpConnectorTest, test5FloatTimestampNoPushdown) {
661665

662666
TEST_F(ClpConnectorTest, test5FloatTimestampPushdown) {
663667
// Test filtering rows with a timestamp parsed from a date string and floats
664-
// in various formats. Because KQL doesn’t automatically interpret the unit of
665-
// the timestamp, the returned result differs slightly from the one without
666-
// pushdown.
668+
// in various formats.
667669
const std::shared_ptr<std::string> kqlQuery = std::make_shared<std::string>(
668-
"(timestamp < 1746003005.127 and timestamp >= 1746003005.124)");
670+
R"(timestamp < timestamp("1746003070000", "\L") and timestamp >= timestamp("1746003005124", "\L"))");
669671
auto plan =
670672
PlanBuilder(pool_.get())
671673
.startTableScan()
@@ -695,12 +697,67 @@ TEST_F(ClpConnectorTest, test5FloatTimestampPushdown) {
695697
{Timestamp(1746003005, 124000000),
696698
Timestamp(1746003005, 124100000),
697699
Timestamp(1746003005, 125000000),
698-
Timestamp(1746003005, 126000000)}),
700+
Timestamp(1746003005, 126000000),
701+
Timestamp(1746003005, 127000000),
702+
Timestamp(1746003060, 0),
703+
Timestamp(1746003065, 0)}),
699704
makeFlatVector<double>(
700-
{1.234567891234500E9,
705+
{1.2345678912345E9,
701706
1E16,
702707
1.234567891234567E9,
703-
1.234567891234567E9})});
708+
1.234567891234567E9,
709+
-1.234567891234567E-9,
710+
1234567891.234567,
711+
-1234567891.234567})});
712+
test::assertEqualVectors(expected, output);
713+
}
714+
715+
TEST_F(ClpConnectorTest, test5NewTimestampFormatFloatTimestampPushdown) {
716+
// Test filtering rows with a timestamp parsed from a date string and floats
717+
// in various formats.
718+
const std::shared_ptr<std::string> kqlQuery = std::make_shared<std::string>(
719+
R"(timestamp < timestamp("1746003070000", "\L") and timestamp >= timestamp("1746003005124", "\L"))");
720+
auto plan =
721+
PlanBuilder(pool_.get())
722+
.startTableScan()
723+
.outputType(ROW({"timestamp", "floatValue"}, {TIMESTAMP(), DOUBLE()}))
724+
.tableHandle(
725+
std::make_shared<ClpTableHandle>(kClpConnectorId, "test_5"))
726+
.assignments(
727+
{{"timestamp",
728+
std::make_shared<ClpColumnHandle>(
729+
"timestamp", "timestamp", TIMESTAMP())},
730+
{"floatValue",
731+
std::make_shared<ClpColumnHandle>(
732+
"floatValue", "floatValue", DOUBLE())}})
733+
.endTableScan()
734+
.orderBy({"\"timestamp\" ASC"}, false)
735+
.planNode();
736+
737+
auto output = getResults(
738+
plan,
739+
{makeClpSplit(
740+
getExampleFilePath("test_5.v0.5.0.clps"),
741+
ClpConnectorSplit::SplitType::kArchive,
742+
kqlQuery)});
743+
auto expected = makeRowVector(
744+
{// timestamp
745+
makeFlatVector<Timestamp>(
746+
{Timestamp(1746003005, 124000000),
747+
Timestamp(1746003005, 124100000),
748+
Timestamp(1746003005, 125000000),
749+
Timestamp(1746003005, 126000000),
750+
Timestamp(1746003005, 127000001),
751+
Timestamp(1746003060, 0),
752+
Timestamp(1746003065, 0)}),
753+
makeFlatVector<double>(
754+
{1.2345678912345E9,
755+
1E16,
756+
1.234567891234567E9,
757+
1.234567891234567E9,
758+
-1.234567891234567E-9,
759+
1234567891.234567,
760+
-1234567891.234567})});
704761
test::assertEqualVectors(expected, output);
705762
}
706763

945 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)