@@ -110,6 +110,16 @@ void check_column_presence(OutputSchema& output_schema, const std::unordered_set
110
110
}
111
111
}
112
112
113
+ void check_is_timeseries (const StreamDescriptor& stream_descriptor, std::string_view clause_name) {
114
+ schema::check<ErrorCode::E_UNSUPPORTED_INDEX_TYPE>(
115
+ stream_descriptor.index ().type () == IndexDescriptor::Type::TIMESTAMP &&
116
+ stream_descriptor.index ().field_count () >= 1 &&
117
+ stream_descriptor.field (0 ).type () == make_scalar_type (DataType::NANOSECONDS_UTC64),
118
+ " {}Clause can only be applied to timeseries" ,
119
+ clause_name
120
+ );
121
+ }
122
+
113
123
std::vector<EntityId> PassthroughClause::process (std::vector<EntityId>&& entity_ids) const {
114
124
return std::move (entity_ids);
115
125
}
@@ -520,12 +530,7 @@ void ResampleClause<closed_boundary>::set_component_manager(std::shared_ptr<Comp
520
530
521
531
template <ResampleBoundary closed_boundary>
522
532
OutputSchema ResampleClause<closed_boundary>::modify_schema(OutputSchema&& output_schema) const {
523
- schema::check<ErrorCode::E_UNSUPPORTED_INDEX_TYPE>(
524
- output_schema.stream_descriptor_ .index ().type () == IndexDescriptor::Type::TIMESTAMP &&
525
- output_schema.stream_descriptor_ .index ().field_count () >= 1 &&
526
- output_schema.stream_descriptor_ .field (0 ).type () == make_scalar_type (DataType::NANOSECONDS_UTC64),
527
- " ResampleClause can only be applied to timeseries"
528
- );
533
+ check_is_timeseries (output_schema.stream_descriptor_ , " Resample" );
529
534
check_column_presence (output_schema, *clause_info_.input_columns_ , " Resample" );
530
535
StreamDescriptor stream_desc (output_schema.stream_descriptor_ .id ());
531
536
stream_desc.add_field (output_schema.stream_descriptor_ .field (0 ));
@@ -1003,12 +1008,7 @@ const ClauseInfo& MergeClause::clause_info() const {
1003
1008
}
1004
1009
1005
1010
OutputSchema MergeClause::modify_schema (OutputSchema&& output_schema) const {
1006
- schema::check<ErrorCode::E_UNSUPPORTED_INDEX_TYPE>(
1007
- output_schema.stream_descriptor_ .index ().type () == IndexDescriptor::Type::TIMESTAMP &&
1008
- output_schema.stream_descriptor_ .index ().field_count () >= 1 &&
1009
- output_schema.stream_descriptor_ .field (0 ).type () == make_scalar_type (DataType::NANOSECONDS_UTC64),
1010
- " MergeClause can only be applied to timeseries"
1011
- );
1011
+ check_is_timeseries (output_schema.stream_descriptor_ , " Merge" );
1012
1012
return output_schema;
1013
1013
}
1014
1014
@@ -1323,12 +1323,7 @@ std::vector<EntityId> DateRangeClause::process(std::vector<EntityId> &&entity_id
1323
1323
}
1324
1324
1325
1325
OutputSchema DateRangeClause::modify_schema (OutputSchema&& output_schema) const {
1326
- schema::check<ErrorCode::E_UNSUPPORTED_INDEX_TYPE>(
1327
- output_schema.stream_descriptor_ .index ().type () == IndexDescriptor::Type::TIMESTAMP &&
1328
- output_schema.stream_descriptor_ .index ().field_count () >= 1 &&
1329
- output_schema.stream_descriptor_ .field (0 ).type () == make_scalar_type (DataType::NANOSECONDS_UTC64),
1330
- " DateRangeClause can only be applied to timeseries"
1331
- );
1326
+ check_is_timeseries (output_schema.stream_descriptor_ , " DateRange" );
1332
1327
return output_schema;
1333
1328
}
1334
1329
0 commit comments