@@ -134,21 +134,14 @@ std::vector<EntityId> FilterClause::process(std::vector<EntityId>&& entity_ids)
134
134
}
135
135
136
136
OutputSchema FilterClause::modify_schema (OutputSchema&& output_schema) const {
137
- // TODO: Factor out checking against clause_info_.input_columns_ into separate function and call from all clauses
138
- // TODO: Consider adding (optional, lazily constructed?) unordered map from column names to data types in
139
- // output_schema to make this sort of operation more efficient
137
+ const auto & column_types = output_schema.column_types ();
140
138
for (const auto & input_column: *clause_info_.input_columns_ ) {
141
139
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
142
- output_schema. stream_descriptor_ . find_field (input_column). has_value ( ),
140
+ column_types. contains (input_column),
143
141
" FilterClause requires column '{}' to exist in input data" ,
144
142
input_column
145
143
);
146
144
}
147
- // TODO: Factor this out with same code in ProjectClause
148
- std::unordered_map<std::string, DataType> column_types;
149
- for (const auto & field: output_schema.stream_descriptor_ .fields ()) {
150
- column_types.emplace (field.name (), field.type ().data_type ());
151
- }
152
145
auto expr = expression_context_->expression_nodes_ .get_value (expression_context_->root_node_name_ .value );
153
146
auto opt_datatype = expr->compute (*expression_context_, column_types);
154
147
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(!opt_datatype.has_value (), " FilterClause AST produces a column, not a bitset" );
@@ -193,24 +186,19 @@ std::vector<EntityId> ProjectClause::process(std::vector<EntityId>&& entity_ids)
193
186
}
194
187
195
188
OutputSchema ProjectClause::modify_schema (OutputSchema&& output_schema) const {
196
- // TODO: Factor out checking against clause_info_.input_columns_ into separate function and call from all clauses
197
- // TODO: Consider adding (optional, lazily constructed?) unordered map from column names to data types in
198
- // output_schema to make this sort of operation more efficient
189
+ auto & column_types = output_schema.column_types ();
199
190
for (const auto & input_column: *clause_info_.input_columns_ ) {
200
191
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
201
- output_schema. stream_descriptor_ . find_field (input_column). has_value ( ),
202
- " ProjectClause requires column '{}' to exist in input data" ,
192
+ column_types. contains (input_column),
193
+ " FilterClause requires column '{}' to exist in input data" ,
203
194
input_column
204
195
);
205
196
}
206
- std::unordered_map<std::string, DataType> column_types;
207
- for (const auto & field: output_schema.stream_descriptor_ .fields ()) {
208
- column_types.emplace (field.name (), field.type ().data_type ());
209
- }
210
197
auto expr = expression_context_->expression_nodes_ .get_value (expression_context_->root_node_name_ .value );
211
198
auto opt_datatype = expr->compute (*expression_context_, column_types);
212
199
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(opt_datatype.has_value (), " ProjectClause AST produces a bitset, not a column" );
213
200
output_schema.stream_descriptor_ .add_scalar_field (*opt_datatype, output_column_);
201
+ column_types.emplace (output_column_, *opt_datatype);
214
202
return output_schema;
215
203
}
216
204
@@ -477,12 +465,10 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
477
465
}
478
466
479
467
OutputSchema AggregationClause::modify_schema (OutputSchema&& output_schema) const {
480
- // TODO: Factor out checking against clause_info_.input_columns_ into separate function and call from all clauses
481
- // TODO: Consider adding (optional, lazily constructed?) unordered map from column names to data types in
482
- // output_schema to make this sort of operation more efficient
468
+ const auto & column_types = output_schema.column_types ();
483
469
for (const auto & input_column: *clause_info_.input_columns_ ) {
484
470
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
485
- output_schema. stream_descriptor_ . find_field (input_column). has_value ( ),
471
+ column_types. contains (input_column),
486
472
" AggregationClause requires column '{}' to exist in input data" ,
487
473
input_column
488
474
);
@@ -491,7 +477,6 @@ OutputSchema AggregationClause::modify_schema(OutputSchema&& output_schema) cons
491
477
stream_desc.add_field (output_schema.stream_descriptor_ .field (*output_schema.stream_descriptor_ .find_field (grouping_column_)));
492
478
stream_desc.set_index ({0 , IndexDescriptorImpl::Type::ROWCOUNT});
493
479
494
- // TODO: Similar to process method, consider refactoring
495
480
for (const auto & agg: aggregators_){
496
481
const auto & input_column_name = agg.get_input_column_name ().value ;
497
482
const auto & output_column_name = agg.get_output_column_name ().value ;
@@ -503,6 +488,7 @@ OutputSchema AggregationClause::modify_schema(OutputSchema&& output_schema) cons
503
488
}
504
489
505
490
output_schema.stream_descriptor_ = std::move (stream_desc);
491
+ output_schema.clear_column_types ();
506
492
auto mutable_index = output_schema.norm_metadata_ .mutable_df ()->mutable_common ()->mutable_index ();
507
493
mutable_index->set_name (grouping_column_);
508
494
mutable_index->clear_fake_name ();
@@ -549,12 +535,10 @@ OutputSchema ResampleClause<closed_boundary>::modify_schema(OutputSchema&& outpu
549
535
output_schema.stream_descriptor_ .field (0 ).type () == make_scalar_type (DataType::NANOSECONDS_UTC64),
550
536
" ResampleClause can only be applied to timeseries"
551
537
);
552
- // TODO: Factor out checking against clause_info_.input_columns_ into separate function and call from all clauses
553
- // TODO: Consider adding (optional, lazily constructed?) unordered map from column names to data types in
554
- // output_schema to make this sort of operation more efficient
538
+ const auto & column_types = output_schema.column_types ();
555
539
for (const auto & input_column: *clause_info_.input_columns_ ) {
556
540
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
557
- output_schema. stream_descriptor_ . find_field (input_column). has_value ( ),
541
+ column_types. contains (input_column),
558
542
" ResampleClause requires column '{}' to exist in input data" ,
559
543
input_column
560
544
);
@@ -572,6 +556,7 @@ OutputSchema ResampleClause<closed_boundary>::modify_schema(OutputSchema&& outpu
572
556
stream_desc.add_scalar_field (output_column_type, output_column_name);
573
557
}
574
558
output_schema.stream_descriptor_ = std::move (stream_desc);
559
+ output_schema.clear_column_types ();
575
560
576
561
if (output_schema.norm_metadata_ .df ().common ().has_multi_index ()) {
577
562
const auto & multi_index = output_schema.norm_metadata_ .mutable_df ()->mutable_common ()->multi_index ();
0 commit comments