Skip to content

Commit a26f7f2

Browse files
committed
Make resampling work using dynamic schema
1 parent f11f98f commit a26f7f2

File tree

4 files changed

+18
-23
lines changed

4 files changed

+18
-23
lines changed

cpp/arcticdb/processing/clause.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -671,8 +671,11 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
671671
}
672672
);
673673
}
674-
auto aggregated_column = std::make_shared<Column>(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool));
675-
seg.add_column(scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value), aggregated_column);
674+
std::optional<Column> aggregated_column = aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool);
675+
if (aggregated_column) {
676+
auto aggregated_column_ptr = std::make_shared<Column>(std::move(*aggregated_column));
677+
seg.add_column(scalar_field(aggregated_column_ptr->type().data_type(), aggregator.get_output_column_name().value), std::move(aggregated_column_ptr));
678+
}
676679
}
677680
seg.set_row_data(output_index_column->row_count() - 1);
678681
return push_entities(*component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range)));

cpp/arcticdb/processing/sorted_aggregation.cpp

+9-14
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
namespace arcticdb {
1313

1414
template<AggregationOperator aggregation_operator, ResampleBoundary closed_boundary>
15-
Column SortedAggregator<aggregation_operator, closed_boundary>::aggregate(const std::vector<std::shared_ptr<Column>>& input_index_columns,
15+
std::optional<Column> SortedAggregator<aggregation_operator, closed_boundary>::aggregate(const std::vector<std::shared_ptr<Column>>& input_index_columns,
1616
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns,
1717
const std::vector<timestamp>& bucket_boundaries,
1818
const Column& output_index_column,
1919
StringPool& string_pool) const {
2020
using IndexTDT = ScalarTagType<DataTypeTag<DataType::NANOSECONDS_UTC64>>;
21-
auto common_input_type = generate_common_input_type(input_agg_columns);
22-
Column res(TypeDescriptor(generate_output_data_type(common_input_type), Dimension::Dim0), output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
21+
const std::optional<DataType> common_input_type = generate_common_input_type(input_agg_columns);
22+
if (!common_input_type.has_value()) {
23+
return std::nullopt;
24+
}
25+
Column res(TypeDescriptor(generate_output_data_type(*common_input_type), Dimension::Dim0), output_index_column.row_count(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
2326
details::visit_type(
2427
res.type().data_type(),
2528
[this,
@@ -128,11 +131,11 @@ Column SortedAggregator<aggregation_operator, closed_boundary>::aggregate(const
128131
}
129132
}
130133
);
131-
return res;
134+
return {std::move(res)};
132135
}
133136

134137
template<AggregationOperator aggregation_operator, ResampleBoundary closed_boundary>
135-
DataType SortedAggregator<aggregation_operator, closed_boundary>::generate_common_input_type(
138+
std::optional<DataType> SortedAggregator<aggregation_operator, closed_boundary>::generate_common_input_type(
136139
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns
137140
) const {
138141
std::optional<DataType> common_input_type;
@@ -141,17 +144,9 @@ DataType SortedAggregator<aggregation_operator, closed_boundary>::generate_commo
141144
auto input_data_type = opt_input_agg_column->column_->type().data_type();
142145
check_aggregator_supported_with_data_type(input_data_type);
143146
add_data_type_impl(input_data_type, common_input_type);
144-
} else {
145-
// Column is missing from this row-slice due to dynamic schema, currently unsupported
146-
schema::raise<ErrorCode::E_UNSUPPORTED_COLUMN_TYPE>("Resample: Cannot aggregate column '{}' as it is missing from some row slices",
147-
get_input_column_name().value);
148147
}
149148
}
150-
// Column is missing from all row-slices due to dynamic schema, currently unsupported
151-
schema::check<ErrorCode::E_UNSUPPORTED_COLUMN_TYPE>(common_input_type.has_value(),
152-
"Resample: Cannot aggregate column '{}' as it is missing from some row slices",
153-
get_input_column_name().value);
154-
return *common_input_type;
149+
return common_input_type;
155150
}
156151

157152
template<AggregationOperator aggregation_operator, ResampleBoundary closed_boundary>

cpp/arcticdb/processing/sorted_aggregation.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct ISortedAggregator {
2828
struct Interface : Base {
2929
[[nodiscard]] ColumnName get_input_column_name() const { return folly::poly_call<0>(*this); };
3030
[[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); };
31-
[[nodiscard]] Column aggregate(const std::vector<std::shared_ptr<Column>>& input_index_columns,
31+
[[nodiscard]] std::optional<Column> aggregate(const std::vector<std::shared_ptr<Column>>& input_index_columns,
3232
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns,
3333
const std::vector<timestamp>& bucket_boundaries,
3434
const Column& output_index_column,
@@ -354,13 +354,13 @@ class SortedAggregator
354354
[[nodiscard]] ColumnName get_input_column_name() const { return input_column_name_; }
355355
[[nodiscard]] ColumnName get_output_column_name() const { return output_column_name_; }
356356

357-
[[nodiscard]] Column aggregate(const std::vector<std::shared_ptr<Column>>& input_index_columns,
357+
[[nodiscard]] std::optional<Column> aggregate(const std::vector<std::shared_ptr<Column>>& input_index_columns,
358358
const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns,
359359
const std::vector<timestamp>& bucket_boundaries,
360360
const Column& output_index_column,
361361
StringPool& string_pool) const;
362362
private:
363-
[[nodiscard]] DataType generate_common_input_type(const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns) const;
363+
[[nodiscard]] std::optional<DataType> generate_common_input_type(const std::vector<std::optional<ColumnWithStrings>>& input_agg_columns) const;
364364
void check_aggregator_supported_with_data_type(DataType data_type) const;
365365
[[nodiscard]] DataType generate_output_data_type(DataType common_input_data_type) const;
366366
[[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const;

cpp/arcticdb/storage/s3/s3_client_wrapper.cpp

+1-4
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,7 @@ folly::Future<S3Result<std::monostate>> S3ClientTestWrapper::delete_object(
135135

136136
return actual_client_->delete_object(s3_object_name, bucket_name);
137137
}
138-
139-
// Using a fixed page size since it's only being used for simple tests.
140-
// If we ever need to configure it we should move it to the s3 proto config instead.
141-
constexpr auto page_size = 10;
138+
142139
S3Result<ListObjectsOutput> S3ClientTestWrapper::list_objects(
143140
const std::string& name_prefix,
144141
const std::string& bucket_name,

0 commit comments

Comments
 (0)