Skip to content

Enhancement/8561507350/precompute output schema from processing #2233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,9 @@ if(${TEST})
processing/test/test_filter_and_project_sparse.cpp
processing/test/test_type_promotion.cpp
processing/test/test_operation_dispatch.cpp
processing/test/test_output_schema_aggregator_types.cpp
processing/test/test_output_schema_ast_validity.cpp
processing/test/test_output_schema_basic.cpp
processing/test/test_parallel_processing.cpp
processing/test/test_resample.cpp
processing/test/test_set_membership.cpp
Expand Down
40 changes: 40 additions & 0 deletions cpp/arcticdb/entity/stream_descriptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <arcticdb/util/variant.hpp>
#include <arcticdb/entity/types_proto.hpp>

#include <ankerl/unordered_dense.h>

namespace arcticdb::entity {

struct SegmentDescriptorImpl : public SegmentDescriptor {
Expand Down Expand Up @@ -271,6 +273,44 @@ struct StreamDescriptor {
}
};

struct OutputSchema {
arcticdb::proto::descriptors::NormalizationMetadata norm_metadata_;

OutputSchema(StreamDescriptor stream_descriptor,
arcticdb::proto::descriptors::NormalizationMetadata norm_metadata):
norm_metadata_(std::move(norm_metadata)),
stream_descriptor_(std::move(stream_descriptor)) {};

const StreamDescriptor& stream_descriptor() const {
return stream_descriptor_;
}

void set_stream_descriptor(StreamDescriptor&& stream_descriptor) {
stream_descriptor_ = std::move(stream_descriptor);
column_types_ = std::nullopt;
}

ankerl::unordered_dense::map<std::string, DataType>& column_types() {
if (!column_types_.has_value()) {
column_types_ = ankerl::unordered_dense::map<std::string, DataType>();
column_types_->reserve(stream_descriptor_.field_count());
for (const auto& field: stream_descriptor_.fields()) {
column_types_->emplace(field.name(), field.type().data_type());
}
}
return *column_types_;
}

void add_field(std::string_view name, DataType data_type) {
stream_descriptor_.add_scalar_field(data_type, name);
column_types().emplace(name, data_type);
}

private:
StreamDescriptor stream_descriptor_;
std::optional<ankerl::unordered_dense::map<std::string, DataType>> column_types_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be wrapped in an optional? Can't use check if the map is empty and then create it in column_types?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory yes, although in the (admittedly niche) case where you have no columns, you would repeatedly see that the map is empty and regenerate it, after which it would remain empty.
Semantically though, I think an optional makes it clearer if it has been generated or not

};

template <class IndexType>
inline void set_index(StreamDescriptor &stream_desc) {
stream_desc.set_index_field_count(std::uint32_t(IndexType::field_count()));
Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/processing/aggregation_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ struct IGroupingAggregatorData {
struct Interface : Base {
void add_data_type(DataType data_type) { folly::poly_call<0>(*this, data_type); }

DataType get_output_data_type() { return folly::poly_call<1>(*this); };

void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values) {
folly::poly_call<1>(*this, input_column, groups, unique_values);
folly::poly_call<2>(*this, input_column, groups, unique_values);
}
[[nodiscard]] SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values) {
return folly::poly_call<2>(*this, output_column_name, dynamic_schema, unique_values);
return folly::poly_call<3>(*this, output_column_name, dynamic_schema, unique_values);
}
};

template<class T>
using Members = folly::PolyMembers<&T::add_data_type, &T::aggregate, &T::finalize>;
using Members = folly::PolyMembers<&T::add_data_type, &T::get_output_data_type, &T::aggregate, &T::finalize>;
};

using GroupingAggregatorData = folly::Poly<IGroupingAggregatorData>;
Expand Down
113 changes: 113 additions & 0 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,28 @@ struct SegmentWrapper {
}
};

void check_column_presence(OutputSchema& output_schema, const std::unordered_set<std::string>& required_columns, std::string_view clause_name) {
const auto& column_types = output_schema.column_types();
for (const auto& input_column: required_columns) {
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
column_types.contains(input_column),
"{}Clause requires column '{}' to exist in input data",
clause_name,
input_column
);
}
}

void check_is_timeseries(const StreamDescriptor& stream_descriptor, std::string_view clause_name) {
schema::check<ErrorCode::E_UNSUPPORTED_INDEX_TYPE>(
stream_descriptor.index().type() == IndexDescriptor::Type::TIMESTAMP &&
stream_descriptor.index().field_count() >= 1 &&
stream_descriptor.field(0).type() == make_scalar_type(DataType::NANOSECONDS_UTC64),
"{}Clause can only be applied to timeseries",
clause_name
);
}

std::vector<EntityId> PassthroughClause::process(std::vector<EntityId>&& entity_ids) const {
return std::move(entity_ids);
}
Expand Down Expand Up @@ -133,6 +155,14 @@ std::vector<EntityId> FilterClause::process(std::vector<EntityId>&& entity_ids)
return output;
}

OutputSchema FilterClause::modify_schema(OutputSchema&& output_schema) const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I curious what was the rationale behind taking an rvalue reference and returning a modified object vs. just taking a ref and modifying in place.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some clauses (e.g. projections) do something that looks like modifying the existing schema (adding one field).
Others (e.g. resampling) completely throw away the existing schema and create a new one from scratch, so I think this approach makes it clearer that the returned value is not necessarily closely related to the input value.

check_column_presence(output_schema, *clause_info_.input_columns_, "Filter");
auto root_expr = expression_context_->expression_nodes_.get_value(expression_context_->root_node_name_.value);
std::variant<BitSetTag, DataType> return_type = root_expr->compute(*expression_context_, output_schema.column_types());
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(std::holds_alternative<BitSetTag>(return_type), "FilterClause AST would produce a column, not a bitset");
return output_schema;
}

std::string FilterClause::to_string() const {
return expression_context_ ? fmt::format("WHERE {}", expression_context_->root_node_name_.value) : "";
}
Expand Down Expand Up @@ -170,6 +200,15 @@ std::vector<EntityId> ProjectClause::process(std::vector<EntityId>&& entity_ids)
return output;
}

OutputSchema ProjectClause::modify_schema(OutputSchema&& output_schema) const {
check_column_presence(output_schema, *clause_info_.input_columns_, "Project");
auto root_expr = expression_context_->expression_nodes_.get_value(expression_context_->root_node_name_.value);
std::variant<BitSetTag, DataType> return_type = root_expr->compute(*expression_context_, output_schema.column_types());
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(std::holds_alternative<DataType>(return_type), "ProjectClause AST would produce a bitset, not a column");
output_schema.add_field(output_column_, std::get<DataType>(return_type));
return output_schema;
}

[[nodiscard]] std::string ProjectClause::to_string() const {
return expression_context_ ? fmt::format("PROJECT Column[\"{}\"] = {}", output_column_, expression_context_->root_node_name_.value) : "";
}
Expand Down Expand Up @@ -432,6 +471,31 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
return push_entities(*component_manager_, ProcessingUnit(std::move(seg)));
}

OutputSchema AggregationClause::modify_schema(OutputSchema&& output_schema) const {
check_column_presence(output_schema, *clause_info_.input_columns_, "Aggregation");
const auto& input_stream_desc = output_schema.stream_descriptor();
StreamDescriptor stream_desc(input_stream_desc.id());
stream_desc.add_field(input_stream_desc.field(*input_stream_desc.find_field(grouping_column_)));
stream_desc.set_index({0, IndexDescriptorImpl::Type::ROWCOUNT});

for (const auto& agg: aggregators_){
const auto& input_column_name = agg.get_input_column_name().value;
const auto& output_column_name = agg.get_output_column_name().value;
const auto& input_column_type = input_stream_desc.field(*input_stream_desc.find_field(input_column_name)).type().data_type();
auto agg_data = agg.get_aggregator_data();
agg_data.add_data_type(input_column_type);
const auto& output_column_type = agg_data.get_output_data_type();
stream_desc.add_scalar_field(output_column_type, output_column_name);
}

output_schema.set_stream_descriptor(std::move(stream_desc));
auto mutable_index = output_schema.norm_metadata_.mutable_df()->mutable_common()->mutable_index();
mutable_index->set_name(grouping_column_);
mutable_index->clear_fake_name();
mutable_index->set_is_physically_stored(true);
return output_schema;
}

[[nodiscard]] std::string AggregationClause::to_string() const {
return str_;
}
Expand Down Expand Up @@ -463,6 +527,45 @@ void ResampleClause<closed_boundary>::set_component_manager(std::shared_ptr<Comp
component_manager_ = std::move(component_manager);
}

template<ResampleBoundary closed_boundary>
OutputSchema ResampleClause<closed_boundary>::modify_schema(OutputSchema&& output_schema) const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that's not clear to me is when the type promotion happens for resampling. In case we have multiple segments with different types are we going to know the output type prior reading all segments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamDescriptor in the input OutputSchema object is from the TimeSeriesDescriptor for the whole symbol-version pair. When dynamic schema is enabled and the types are changing for a given column, by definition, the type of the column in the TimeSeriesDescriptor is capable of representing all of the types of the individual row-slices of that column.

check_is_timeseries(output_schema.stream_descriptor(), "Resample");
check_column_presence(output_schema, *clause_info_.input_columns_, "Resample");
const auto& input_stream_desc = output_schema.stream_descriptor();
StreamDescriptor stream_desc(input_stream_desc.id());
stream_desc.add_field(input_stream_desc.field(0));
stream_desc.set_index(IndexDescriptorImpl(1, IndexDescriptor::Type::TIMESTAMP));

for (const auto& agg: aggregators_){
const auto& input_column_name = agg.get_input_column_name().value;
const auto& output_column_name = agg.get_output_column_name().value;
const auto& input_column_type = input_stream_desc.field(*input_stream_desc.find_field(input_column_name)).type().data_type();
agg.check_aggregator_supported_with_data_type(input_column_type);
auto output_column_type = agg.generate_output_data_type(input_column_type);
stream_desc.add_scalar_field(output_column_type, output_column_name);
}
output_schema.set_stream_descriptor(std::move(stream_desc));

if (output_schema.norm_metadata_.df().common().has_multi_index()) {
const auto& multi_index = output_schema.norm_metadata_.mutable_df()->mutable_common()->multi_index();
auto name = multi_index.name();
auto tz = multi_index.tz();
bool fake_name{false};
for (auto pos: multi_index.fake_field_pos()) {
if (pos == 0) {
fake_name = true;
break;
}
}
auto mutable_index = output_schema.norm_metadata_.mutable_df()->mutable_common()->mutable_index();
mutable_index->set_tz(tz);
mutable_index->set_is_physically_stored(true);
mutable_index->set_name(name);
mutable_index->set_fake_name(fake_name);
}
return output_schema;
}

template<ResampleBoundary closed_boundary>
std::string ResampleClause<closed_boundary>::rule() const {
return rule_;
Expand Down Expand Up @@ -903,6 +1006,11 @@ const ClauseInfo& MergeClause::clause_info() const {
return clause_info_;
}

OutputSchema MergeClause::modify_schema(OutputSchema&& output_schema) const {
check_is_timeseries(output_schema.stream_descriptor(), "Merge");
return output_schema;
}

std::vector<std::vector<EntityId>> MergeClause::structure_for_processing(std::vector<std::vector<EntityId>>&& entity_ids_vec) {

// TODO this is a hack because we don't currently have a way to
Expand Down Expand Up @@ -1213,6 +1321,11 @@ std::vector<EntityId> DateRangeClause::process(std::vector<EntityId> &&entity_id
return push_entities(*component_manager_, std::move(proc));
}

OutputSchema DateRangeClause::modify_schema(OutputSchema&& output_schema) const {
check_is_timeseries(output_schema.stream_descriptor(), "DateRange");
return output_schema;
}

std::string DateRangeClause::to_string() const {
return fmt::format("DATE RANGE {} - {}", start_, end_);
}
Expand Down
Loading
Loading