Skip to content

Commit 4fa6bb9

Browse files
committed
Fix convert int to float
1 parent 2b7d252 commit 4fa6bb9

File tree

9 files changed

+183
-68
lines changed

9 files changed

+183
-68
lines changed

cpp/arcticdb/entity/merge_descriptors.cpp

+34-21
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111
#include <arcticdb/entity/type_utils.hpp>
1212

1313
namespace arcticdb {
14+
/// @param convert_int_to_float This will convert all integer types (both signed and unsigned) to FLOAT64 without
15+
/// performing any type checks. This is available only via the V1 Library API and is used by tick collectors. It can
16+
/// be true only if descriptors are merged during a compact_incomplete call. Otherwise, it must be false.
1417
StreamDescriptor merge_descriptors(
1518
const StreamDescriptor &original,
1619
std::span<const std::shared_ptr<FieldCollection>> entries,
17-
const std::unordered_set<std::string_view> &filtered_set,
18-
const std::optional<IndexDescriptorImpl>& default_index) {
20+
const std::unordered_set<std::string_view>& filtered_set,
21+
const std::optional<IndexDescriptorImpl>& default_index,
22+
bool convert_int_to_float) {
1923
using namespace arcticdb::stream;
2024
std::vector<std::string_view> merged_fields;
2125
std::unordered_map<std::string_view, TypeDescriptor> merged_fields_map;
@@ -55,10 +59,10 @@ StreamDescriptor merge_descriptors(
5559
);
5660
}
5761

58-
for (size_t idx = has_index ? 1u : 0u; idx < static_cast<size_t>(fields->size()); ++idx) {
62+
for (size_t idx = has_index ? 1u : 0u; idx < fields->size(); ++idx) {
5963
const auto& field = fields->at(idx);
6064
const auto& type_desc = field.type();
61-
if (filtered_set.empty() || (filtered_set.find(field.name()) != filtered_set.end())) {
65+
if (!filtered_set.contains(field.name())) {
6266
if(auto existing = merged_fields_map.find(field.name()); existing != merged_fields_map.end()) {
6367
auto existing_type_desc = existing->second;
6468
if(existing_type_desc != type_desc) {
@@ -68,17 +72,22 @@ StreamDescriptor merge_descriptors(
6872
"New type descriptor : {}",
6973
field.name(), existing_type_desc, type_desc
7074
);
71-
auto new_descriptor = has_valid_common_type(existing_type_desc, type_desc);
72-
if(new_descriptor) {
73-
merged_fields_map[field.name()] = *new_descriptor;
75+
if (convert_int_to_float && is_integer_type(existing_type_desc.data_type()) && is_integer_type(type_desc.data_type())) {
76+
merged_fields_map[field.name()] = TypeDescriptor{DataType::FLOAT64, existing_type_desc.dimension()};
7477
} else {
75-
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
76-
"No valid common type between {} and {} for column {}",
77-
existing_type_desc,
78-
type_desc,
79-
field.name()
80-
);
78+
auto new_descriptor = has_valid_common_type(existing_type_desc, type_desc);
79+
if(new_descriptor) {
80+
merged_fields_map[field.name()] = *new_descriptor;
81+
} else {
82+
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
83+
"No valid common type between {} and {} for column {}",
84+
existing_type_desc,
85+
type_desc,
86+
field.name()
87+
);
88+
}
8189
}
90+
8291
}
8392
} else {
8493
merged_fields.emplace_back(field.name());
@@ -98,46 +107,50 @@ StreamDescriptor merge_descriptors(
98107
const StreamDescriptor &original,
99108
const std::vector<std::shared_ptr<FieldCollection>> &entries,
100109
const std::optional<std::vector<std::string>> &filtered_columns,
101-
const std::optional<IndexDescriptorImpl>& default_index) {
110+
const std::optional<IndexDescriptorImpl>& default_index,
111+
bool convert_int_to_float) {
102112
std::unordered_set<std::string_view> filtered_set = filtered_columns.has_value()
103113
? std::unordered_set<std::string_view>(filtered_columns->begin(), filtered_columns->end())
104114
: std::unordered_set<std::string_view>{};
105-
return merge_descriptors(original, entries, filtered_set, default_index);
115+
return merge_descriptors(original, entries, filtered_set, default_index, convert_int_to_float);
106116
}
107117

108118
StreamDescriptor merge_descriptors(
109119
const StreamDescriptor& original,
110120
std::span<const std::shared_ptr<FieldCollection>> entries,
111121
const std::optional<std::vector<std::string>>& filtered_columns,
112-
const std::optional<IndexDescriptorImpl>& default_index) {
122+
const std::optional<IndexDescriptorImpl>& default_index,
123+
bool convert_int_to_float) {
113124
std::unordered_set<std::string_view> filtered_set = filtered_columns.has_value()
114125
? std::unordered_set<std::string_view>(filtered_columns->begin(), filtered_columns->end())
115126
: std::unordered_set<std::string_view>{};
116-
return merge_descriptors(original, entries, filtered_set, default_index);
127+
return merge_descriptors(original, entries, filtered_set, default_index, convert_int_to_float);
117128
}
118129

119130
StreamDescriptor merge_descriptors(
120131
const StreamDescriptor &original,
121132
const std::vector<pipelines::SliceAndKey> &entries,
122133
const std::optional<std::vector<std::string>> &filtered_columns,
123-
const std::optional<IndexDescriptorImpl>& default_index) {
134+
const std::optional<IndexDescriptorImpl>& default_index,
135+
bool convert_int_to_float) {
124136
std::vector<std::shared_ptr<FieldCollection>> fields;
125137
for (const auto &entry : entries) {
126138
fields.push_back(std::make_shared<FieldCollection>(entry.slice_.desc()->fields().clone()));
127139
}
128-
return merge_descriptors(original, fields, filtered_columns, default_index);
140+
return merge_descriptors(original, fields, filtered_columns, default_index, convert_int_to_float);
129141
}
130142

131143
StreamDescriptor merge_descriptors(
132144
const std::shared_ptr<Store>& store,
133145
const StreamDescriptor &original,
134146
const std::vector<pipelines::SliceAndKey> &entries,
135147
const std::unordered_set<std::string_view> &filtered_set,
136-
const std::optional<IndexDescriptorImpl>& default_index) {
148+
const std::optional<IndexDescriptorImpl>& default_index,
149+
bool convert_int_to_float) {
137150
std::vector<std::shared_ptr<FieldCollection>> fields;
138151
for (const auto &entry : entries) {
139152
fields.push_back(std::make_shared<FieldCollection>(entry.segment(store).descriptor().fields().clone()));
140153
}
141-
return merge_descriptors(original, fields, filtered_set, default_index);
154+
return merge_descriptors(original, fields, filtered_set, default_index, convert_int_to_float);
142155
}
143156
}

cpp/arcticdb/entity/merge_descriptors.hpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,35 @@ StreamDescriptor merge_descriptors(
1414
const StreamDescriptor &original,
1515
std::span<const std::shared_ptr<FieldCollection>> entries,
1616
const std::unordered_set<std::string_view> &filtered_set,
17-
const std::optional<IndexDescriptorImpl>& default_index);
17+
const std::optional<IndexDescriptorImpl>& default_index,
18+
bool convert_int_to_float=false);
1819

1920
entity::StreamDescriptor merge_descriptors(
2021
const entity::StreamDescriptor &original,
2122
const std::vector<std::shared_ptr<FieldCollection>> &entries,
2223
const std::optional<std::vector<std::string>> &filtered_columns,
23-
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);
24+
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt,
25+
bool convert_int_to_float=false);
2426

2527
entity::StreamDescriptor merge_descriptors(
2628
const entity::StreamDescriptor& original,
2729
std::span<const std::shared_ptr<FieldCollection>> entries,
2830
const std::optional<std::vector<std::string>>& filtered_columns,
29-
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);
31+
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt,
32+
bool convert_int_to_float=false);
3033

3134
entity::StreamDescriptor merge_descriptors(
3235
const entity::StreamDescriptor &original,
3336
const std::vector<pipelines::SliceAndKey> &entries,
3437
const std::optional<std::vector<std::string>> &filtered_columns,
35-
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);
38+
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt,
39+
bool convert_int_to_float=false);
3640

3741
entity::StreamDescriptor merge_descriptors(
3842
const std::shared_ptr<Store>& store,
3943
const entity::StreamDescriptor &original,
4044
const std::vector<pipelines::SliceAndKey> &entries,
4145
const std::unordered_set<std::string_view> &filtered_set,
42-
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);
46+
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt,
47+
bool convert_int_to_float=false);
4348
}

cpp/arcticdb/stream/incompletes.cpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,9 @@ std::vector<SliceAndKey> get_incomplete(
430430
util::raise_rte("Only timestamp based ranges supported for filtering.");
431431
},
432432
[&entries](const IndexRange &index_range) {
433-
entries.erase(
434-
std::remove_if(std::begin(entries), std::end(entries), [&](const auto &entry) {
435-
return !intersects(index_range, entry.slice_and_key_.key().index_range());
436-
}),
437-
std::end(entries));
433+
std::erase_if(entries, [&](const auto &entry) {
434+
return !intersects(index_range, entry.slice_and_key_.key().index_range());
435+
});
438436
},
439437
[](const auto &) {
440438
// Don't know what to do with this index

cpp/arcticdb/stream/segment_aggregator.hpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ inline void convert_column_types(SegmentInMemory& segment) {
3333
}
3434

3535
template<class Index, class Schema, class SegmentingPolicy = RowCountSegmentPolicy, class DensityPolicy = DenseColumnPolicy>
36-
class SegmentAggregator : public Aggregator<Index, Schema, SegmentingPolicy, DensityPolicy> {
36+
class SegmentAggregator : public Aggregator<Index, Schema, SegmentingPolicy, DensityPolicy> {
3737
public:
3838
using AggregatorType = Aggregator<Index, Schema, SegmentingPolicy, DensityPolicy>;
3939
using SliceCallBack = folly::Function<void(pipelines::FrameSlice&&)>;
@@ -49,6 +49,11 @@ template<class Index, class Schema, class SegmentingPolicy = RowCountSegmentPoli
4949

5050
void add_segment(SegmentInMemory&& seg, const pipelines::FrameSlice& slice, bool convert_int_to_float) {
5151
auto segment = std::move(seg);
52+
// Very specific use-case, you probably don't want this. This is applied by design even to static schema. It is
53+
// part of an old API that is still used in some tick collectors.
54+
if(convert_int_to_float) {
55+
convert_column_types(segment);
56+
}
5257
if constexpr (std::is_same_v<Schema, FixedSchema>) {
5358
if (stream_descriptor_.has_value()) {
5459
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
@@ -60,9 +65,6 @@ template<class Index, class Schema, class SegmentingPolicy = RowCountSegmentPoli
6065
}
6166
segment.reset_timeseries_descriptor();
6267
AggregatorType::stats().update_many(segment.row_count(), segment.num_bytes());
63-
//TODO very specific use-case, you probably don't want this
64-
if(convert_int_to_float)
65-
convert_column_types(segment);
6668

6769
ARCTICDB_DEBUG(log::version(), "Adding segment with descriptor {} uncompressed_bytes {}", segment.descriptor(), segment.descriptor().uncompressed_bytes());
6870
segments_.push_back(segment);

cpp/arcticdb/version/schema_checks.cpp

+18-3
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,15 @@ bool index_names_match(
9696
return true;
9797
}
9898

99+
/// @param convert_int_to_float If this is true it will consider all pairs of integer types (both signed and unsigned)
100+
/// as identical. If a field in df_in_store_descriptor is FLOAT64 and the corresponding field in new_df_descriptor
101+
/// is of any integer type they will be considered identical. Note that this makes the function unsymmetrical. If a
102+
/// field in new_df_descriptor is FLOAT64 and the corresponding field in df_in_store_descriptor is of integer type
103+
/// the types won't be considered identical. This is supposed to be used only from compact_incomplete.B
99104
bool columns_match(
100105
const StreamDescriptor& df_in_store_descriptor,
101-
const StreamDescriptor& new_df_descriptor
106+
const StreamDescriptor& new_df_descriptor,
107+
const bool convert_int_to_float
102108
) {
103109
const int index_field_size =
104110
df_in_store_descriptor.index().type() == IndexDescriptor::Type::EMPTY ? new_df_descriptor.index().field_count() : 0;
@@ -117,8 +123,17 @@ bool columns_match(
117123
const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type();
118124

119125
if (!trivially_compatible_types(left_type, right_type) &&
120-
!(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type())))
121-
return false;
126+
!(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type()))) {
127+
if (convert_int_to_float) {
128+
const bool both_are_int = is_integer_type(left_type.data_type()) && is_integer_type(right_type.data_type());
129+
if (!(both_are_int || (left_type.data_type() == DataType::FLOAT64 && is_integer_type(right_type.data_type())))) {
130+
return false;
131+
}
132+
} else {
133+
return false;
134+
}
135+
}
136+
122137
}
123138
return true;
124139
}

cpp/arcticdb/version/schema_checks.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ bool index_names_match(
3939

4040
bool columns_match(
4141
const StreamDescriptor& df_in_store_descriptor,
42-
const StreamDescriptor& new_df_descriptor
42+
const StreamDescriptor& new_df_descriptor,
43+
const bool convert_int_to_float=false
4344
);
4445

4546
void fix_descriptor_mismatch_or_throw(

0 commit comments

Comments
 (0)