Skip to content

Provide deterministic column types when reading libraries with dynamic schema. Implement resampling with dynamic schema. #2360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 44 commits into
base: v6.0.0-pre-release
Choose a base branch
from

Conversation

vasil-pashov
Copy link
Collaborator

@vasil-pashov vasil-pashov commented May 17, 2025

User Facing Changes

Change of how dtypes are computed

This PR ensures that the dtypes of the columns will always be deterministic when processing such as (filtering/aggregation/resampling/etc..) is performed. The previous behavior returned dtypes based on the segments that were read, e.g. given a dataframe with each row being a separate segment

index column_a_dtype
0 int8
1 uint16
2 uin8
3 float32

reading row 0 will produce int8, row 1 uint16, etc.. Reading row 0 and row 1 will be int32, reading row 1 and row 2 will be uint16, reading all rows will be float32. In other words the returned dtype was determined by examining the dtypes of all segments and taking the largest type that can represent all of them. With this change the output dtype will always be the same and it will be the dtype able to represent all dtypes in the dataframe.

Breaking change in unsorted aggregation (groupby) and resampling

Previously min and max aggregations in groupby clauses were always promoted to float64 so that if a group is empty the result would be NaN. This is no longer the case as the dtype is stable. In case the dtype is (un)signed integer the value will be zero (when the numpy backend is used, in future when arrow is used it will come out as missing value).

Implementation of resampling when dynamic schema is used.

Prior this PR libraries using dynamic schema could not use the resampling functionality of the query builder. With this PR this is fully implemented.

Sum aggregation dtype

When sum aggregation is performed (unsorted or resampling) the result dtype is the largest type of the respective category, e.g. int -> int64, uint -> uint64, float -> float64, this is done to avoid overflows.

When the dataframe contains int and uint colums this leads to exception and the query could not be executed. The int segment would be promoted to int64, the uint segment will be promoted to uint64 and we do not support a type able to represent both of them. This is fixed with this PR and no longer throws an exception. The common type is decided to be int64 in this particular case. This is because ArcticDB does not allow having mixing uint64 with any int type (append/update will throw). Which means that the largest possible types are uint32 and int64, whose common type is int64, which exactly aligns with our algorithms for type promotion.

Filtering libraries with dynamic schema

Previously using the QueryBuilder on a symbol from a library using dynamic schema returned an empty DataFrame when the column was not present in any segment. After this change ArcticDB will throw in case the column being filtered on is not present in any segment. The column must be in at least one segment. The column is allowed to be missing from some segments.

Implementation details

  1. Add option to initialize column using a custom default value.
  2. Make the columns for the min/max unsorted aggregation sparse so that proper backfilling can be done.
  3. Add map of default values in the OutputSchema that's later stored in the PipelineContext so that CopyToBufferTask can do backfilling. This is used by the sum aggregations as they both need to return 0.0 instead of NaN for float dtypes.
  4. Add permissive mode of conversion from int to float. This new mode is used only in the processing pipeline after processed data is copied to output buffers. It allows casting int to float even when the the float cannot exactly represent the int type, e.g. casting int64 to float32
  5. Change how unsorted aggregations handle the sum column of empty type. They used to add the column and fill it with zero. Now the column is not added and is handled by the null value reducer.
  6. The column returned by sorted aggregation
    6.1. It is now sparse. With dynamic schema there can be many segments and different columns could fill different buckets in those segments.
    6.2. It is now optional because the aggregated column can be missing from all segments.
  7. Handle sparse columns when copying trivially compatible types in copy_frame_data_to_buffer. The current implementation did not consider sparse columns and used memcpy to copy all blocks to the output buffer. This does not work with sparse columns. The new implementation first checks if the column is dense, if so uses the old logic, but if it's sparse it handles it differently to account for missing values.
  8. Improve the hypothesis test for unsorted aggregation with dynamic schema. It used only 3 segments with predefined pattern that did not capture all possible issues. The new tests generates many segments with different sets of columns.

Any other comments?

Checklist

Checklist for code changes...
  • Have you updated the relevant docstrings, documentation and copyright notice?
  • Is this contribution tested against all ArcticDB's features?
  • Do all exceptions introduced raise appropriate error messages?
  • Are API changes highlighted in the PR description?
  • Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?

@vasil-pashov vasil-pashov force-pushed the vasil.pashov/refactor-processing-pipeline-modify-schema branch 2 times, most recently from ec87c70 to d28480c Compare May 17, 2025 14:14
@vasil-pashov vasil-pashov added the patch Small change, should increase patch version label May 17, 2025
@vasil-pashov vasil-pashov marked this pull request as draft May 17, 2025 14:18
@vasil-pashov vasil-pashov force-pushed the vasil.pashov/refactor-processing-pipeline-modify-schema branch 3 times, most recently from ab7648f to 964bf13 Compare May 18, 2025 12:30
@alexowens90
Copy link
Collaborator

We don't need to do it in this PR, but I think we should be able to eliminate the need for the pipeline context for everything that happens after read_process_and_collect. All of the necessary information should be available in the OutputSchema

@vasil-pashov vasil-pashov force-pushed the vasil.pashov/refactor-processing-pipeline-modify-schema branch 3 times, most recently from 95a67a1 to 0f847fb Compare June 6, 2025 21:18
@vasil-pashov vasil-pashov force-pushed the vasil.pashov/refactor-processing-pipeline-modify-schema branch 15 times, most recently from 5918c3a to d966a25 Compare June 8, 2025 23:18
@vasil-pashov vasil-pashov force-pushed the vasil.pashov/refactor-processing-pipeline-modify-schema branch 3 times, most recently from 2a5b3a2 to 00f11b5 Compare June 10, 2025 06:00
@vasil-pashov vasil-pashov force-pushed the vasil.pashov/refactor-processing-pipeline-modify-schema branch from 00f11b5 to 3faf536 Compare June 10, 2025 06:47
@@ -1352,6 +1410,8 @@ struct CopyToBufferTask : async::BaseTask {
DecodePathData shared_data_;
std::any& handler_data_;
OutputFormat output_format_;
IntToFloatConversion int_to_float_conversion_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

@@ -1275,7 +1303,9 @@ void copy_frame_data_to_buffer(
const RowRange& row_range,
DecodePathData shared_data,
std::any& handler_data,
OutputFormat output_format) {
OutputFormat output_format,
IntToFloatConversion int_to_float_conversion,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be an argument as it is always set to permissive

@@ -24,10 +25,13 @@ struct IGroupingAggregatorData {
[[nodiscard]] SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values) {
return folly::poly_call<3>(*this, output_column_name, dynamic_schema, unique_values);
}
[[nodiscard]] std::optional<Value> get_default_value() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2360 (comment)
The meaning of returning nullopt needs a comment here. Or we could make it explicit and always return Value, might be clearer


namespace arcticdb::util {

std::string format_timestamp(const entity::timestamp ts) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const not in header version

// std::std::chrono::time_point<std::chrono::system_clock> does not handle nanoseconds on Windows and Mac

const timestamp seconds = ts / 1'000'000'000;
const timestamp ns_remainder = ts % 1'000'000'000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: std::div

if (bucket_has_values) {
*output_it++ = finalize_aggregator<output_type_info::data_type>(bucket_aggregator, string_pool);
res.set_scalar(row_to_write++, finalize_aggregator<output_type_info::data_type>(bucket_aggregator, string_pool));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to tank the performance in the dense case. It should be possible to precompute the sparsity structure of the output column from input_index_columns and input_agg_columns (which will also allow us to presize res exactly when some input_agg_columns are std::nullopt) and then use the column data iterators like before to keep the performance high. Probably needs a discussion in person on what this looks like.

@@ -120,11 +121,46 @@ Column SortedAggregator<aggregation_operator, closed_boundary>::aggregate(const
}
}
);
} else {
// The column does not contain the aggregation column. However, one of the columns in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whut?

while (next_output_index_row < output_index_column.row_count() &&
*output_index_column.scalar_at<timestamp>(next_output_index_row) < last_index_value + (closed_boundary == ResampleBoundary::LEFT)) {
if (bucket_has_values) {
res.set_scalar(row_to_write, finalize_aggregator<output_type_info::data_type>(bucket_aggregator, string_pool));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct in the case where a bucket spans 3 segments, and the column is missing from the middle segment?


[[nodiscard]]timestamp end() const {
return end_;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new methods seems to be unused

}
const std::span<const MaybeValueType> group_values{reinterpret_cast<const MaybeValueType*>(aggregated.data()), aggregated.size() / sizeof(MaybeValueType)};
Column::for_each_enumerated<typename col_type_info::TDT>(*col, [&](auto row) {
const bool has_value = group_values[row.idx()].written_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this always be true? The written_ field is always set to true at the same time as the bit in the sparse map is set (they are really duplicated information, so one should be eliminated, probably written_ since we need the sparse map)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
major API breaking change, should increase major version
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants