Skip to content

Conversation

@vasil-pashov
Copy link
Collaborator

@vasil-pashov vasil-pashov commented Nov 26, 2025

Reference Issues/PRs

Monday https://man312219.monday.com/boards/7852509418/views/168855452

What does this implement or fix?

This is the first part of the merge update functionality. This provides only updating timeseries containing only non-string columns.

The MergeUpdate is implemented as a clause in the processing pipeline which uses the read-modify-write codepath. This clause can only appear as a first clause in a processing pipeline.

Overview of the algorithm

  1. Filter the segments that have to be read from storage. This is different than the existing clauses (row/date rage, resampling) which load consecutive segments in a range. This clause can load any row slice. The clause receives a source input frame containing the new data and a strategy determining what to do with the new data update/insert. There are two cases depending on the index type:

1.1. Row range indexes require full table scan
1.2. In case of timestamp index this will filter out only the ranges and keys whose index span contains at least one
value from the source's index. This does not mean that there's a match only that a match is possible. A crucial
assumption is that the source is ordered. This means that after ranges_and_keys are ordered by row slice we can
perform only forward iteration over the source index to find matches.

  1. Decide which rows of should be updated and which rows from source should be inserted.

2.2. Filter by index. If the data is timestamp index binary search can be used to find the matching rows. If there's a timestamp index use MergeUpdateClause::filter_index_match this will produce a vector of size equal
to the number of rows from the source that fall into the processed slice. Each vector will contain a vector of
row-indexes in target that match the corresponding soruce index value. If the nested vector is empty this means that insertion must happen (not implemented)

2.3. (NOT IMPLEMENTED) If MergeUpdateClause::on_ is not empty, all columns that are listed in it must mach for a particular row for it to be marked as matching. For each column in MergeUpdateClause::on_ iterate over the vector of vectors produced in the previous step. Checking for match only the target rows that are in the inner vector. If there is no match for this particular column remove the target row index. This means that the ordering of the columns in MergeUpdateClause::on_ matters and it would be more efficient to start with the columns that have a lesser chance of matching.

Testing utilities
This PR also adds utility functions which can be used to create segments and input frames easily.

  1. create_dense_segment Takes in a stream descriptor and argument pack where the i-th argument in the pack corresponds to the i-th field in the descriptor. It then creates a pre-allocated dense segment out of the data in the packs. The data in the packs can be any sequence with a finite size (e.g. std::view, std::array, std::vector). With the slight optimization that if the range is contiguous it'll use memcpy otherwise it'll use std::copy. All ranges in the pack must have the same length.
  2. slice_data_into_segments takes in a descriptor rows per segment, cols per segment and pack of arguments. It operates similarly to create_dense_segment but will return a tuple of 3 vectors. The segments (with no more than rows_per_segment rows and cols_per_segment cols), the corresponding row slice and col slice for the segment. The returned segments are in column major order (first are all the segments for the first col slice, next for the second col slice, etc...)
  3. input_frame_from_tensors similar in concept to create_dense_segment but it will create an InputFrame of NativeTensors. The only gotcha is that the NativeTensors are non-owning. That is why this function will materialize all views. The second element in the return value is the all data that the NativeTensors point to. If it's a view it's materialized otherwise it's just forwarded.

Any other comments?

Checklist

Checklist for code changes...
  • Have you updated the relevant docstrings, documentation and copyright notice?
  • Is this contribution tested against all ArcticDB's features?
  • Do all exceptions introduced raise appropriate error messages?
  • Are API changes highlighted in the PR description?
  • Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?

Allow all sized ranges for creation of native tensors

Parametrize tests

Unify code in testing

Add more unit tests

Change some signatures

Add read modify write index strategy
Fix bugs

Fix edge case where a index values spans multiple rows

Add unit test for the same value spanning several segments

Move utility functions in dedicated headers

Add comments

Bring back old create_dense_column

Fix spelling errors
@vasil-pashov vasil-pashov changed the title Vasil.pashov/merge update using write clause Implement merge update for timeseries containing non-string columns Nov 27, 2025
@vasil-pashov vasil-pashov force-pushed the vasil.pashov/merge-update-using-write-clause branch from 6447387 to 967b70c Compare November 27, 2025 22:47
Comment on lines +16 to +34
ankerl::unordered_dense::set<entity::position_t> unique_values_for_string_column(const Column& column) {
ankerl::unordered_dense::set<entity::position_t> output_set;
// Guessing that unique values is a third of the column length
// TODO would be useful to have actual unique count here from stats
static auto map_reserve_ratio = ConfigsMap::instance()->get_int("UniqueColumns.AllocationRatio", 3);
output_set.reserve(column.row_count() / map_reserve_ratio);

details::visit_type(column.type().data_type(), [&](auto col_desc_tag) {
using type_info = ScalarTypeInfo<decltype(col_desc_tag)>;
if constexpr (is_sequence_type(type_info::data_type)) {
arcticdb::for_each<typename type_info::TDT>(column, [&output_set](auto value) {
output_set.emplace(value);
});
} else {
util::raise_rte("Column {} is not a string type column");
}
});
return output_set;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved from cpp/arcticdb/column_store/segment_utils.hpp without any changes

Comment on lines -19 to -31
inline ankerl::unordered_dense::set<entity::position_t> unique_values_for_string_column(const Column& column) {
ankerl::unordered_dense::set<entity::position_t> output_set;
// Guessing that unique values is a third of the column length
// TODO would be useful to have actual unique count here from stats
static auto map_reserve_ratio = ConfigsMap::instance()->get_int("UniqueColumns.AllocationRatio", 3);
output_set.reserve(column.row_count() / map_reserve_ratio);

details::visit_type(column.type().data_type(), [&](auto col_desc_tag) {
using type_info = ScalarTypeInfo<decltype(col_desc_tag)>;
if constexpr (is_sequence_type(type_info::data_type)) {
arcticdb::for_each<typename type_info::TDT>(column, [&output_set](auto value) {
output_set.emplace(value);
});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved inside cpp/arcticdb/column_store/segment_utils.cpp without any changes

@vasil-pashov vasil-pashov changed the title Implement merge update for timeseries containing non-string columns Implement merge update for timeseries matching on the index Dec 3, 2025
while (target_index_it != target_index_end && source_row < source_row_end) {
const timestamp source_ts = source_->index_value_at(source_row);
// TODO: Profile and compare to linear or adaptive (linear below some threshold) search
auto target_match_it = std::lower_bound(target_index_it, target_index_end, source_ts);
Copy link
Collaborator

@IvoDD IvoDD Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution is O(source_rows * log(target_rows)) which in the somewhat common case where all the rows are matches it will be O(N*log(N)).

We can do better with a search with a similar idea to binary lifting but for an array.

I.e. we can do a search similar to a binary search but we start with just powers of 2 away from target_index_it. E.g. we check:

  • target_index_it + 1
  • target_index_it + 2
  • target_index_it + 4
  • target_index_it + 8
  • target_index_it + 16 # E.g. here we finally get a larger number and continue with a regular binary search between 8 and 16

This approach is nice because complexity is O(source_rows * log(target_rows / source_rows)) or something similar which is guaranteed to be <=O(target_rows)


/// This clause will perform update values or insert values based on strategy_ in a segment. The source of new values is
/// the source_ member. Source and target must have the same index type. There are two actions
/// UPDATE: For a particular row in the segment if there's a row source_ for which all values in the columns listed in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a particular row in the segment if there's a row source_

Nit: "row in source_"

template<typename T>
void select(std::span<const size_t> indexes_to_keep, std::vector<T>& vec) {
arcticdb::debug::check<arcticdb::ErrorCode::E_ASSERTION_FAILURE>(
std::ranges::is_sorted(indexes_to_keep),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It is needed to be strictly sorted. E.g. {0, 0, 1} would not work.

Could use std::adjescent_find(std::greater_equals)==std::end but is less readable and probably not worth it for a debug check anyway.

new_entity_id++;
}
}
ranges_and_keys.erase(ranges_and_keys.begin() + new_entity_id, ranges_and_keys.end());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think instead of writing this loop again to instead use join_view like:

select(row_slices_to_keep, offsets)
auto ranges_to_keep = std::ranges::join_view(offsets);
return select(ranges_to_keep, ranges_and_keys);

debug::check<ErrorCode::E_ASSERTION_FAILURE>(
new_row >= source_data_row_, "Cannot move SourceColumnIterator backwards"
);
// TODO: Implement operator+= because std::advance is linear
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe would be good to open a tickets for some of the TODOs introduced in this PR?

// If the source frame is a list of tensors, the index frame is kept separately, so the first
// non-index column will be at index 0. If there's an index the first ColRange will start from
// 1 and the first column of each segment will contain the index
return col_range.first + index_in_range - 2 * index_field_count;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Come back to this

for (size_t segment_idx = 0; segment_idx < target_segments.size(); ++segment_idx) {
// TODO: Implement equivalent QueryBuilder.optimise_for_speed. Initial implementation will always drop the
// current string pool and recreate it from scratch taking account the new string data from source. This is
// equivalent to QueryBuilder.optimise_for_memory.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how useful optimise_for_speed will be for things we write back to storage. Yes the MergeClause will be faster because we will need to append to the existing StringPool however the encoding decoding and IO in WriteClause and while reading what was written will be slower because it has to process a needlessly long StringPool.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more: there are definetely use cases where optimise_for_speed would make sense. E.g. when updating a categorical column, string pool wouldn't change.

We just need to caveat optimise_for_speed that if used repeatedly it will gradually erode read performance by piling up unused strings.

const bool next_segment_starts_with_last_used_source_index =
next_segment_range.first == source_->index_value_at(last_value_first_occurrence);
if (index_value_spans_two_segments && next_segment_starts_with_last_used_source_index) {
source_row = last_value_first_occurrence;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this can end up O(num_target_row_slices * num_source_rows) when all index values are equal to each other.

This will probably rarely be a problem (unless with very fragmented target data). We can resolve this if instead we iterate over the unique index value ranges in the source.

Might just worth adding a small comment explaining this being quadratic in certain edge cases if we need to fix in the future.

/// Decide which rows of should be updated and which rows from source should be inserted.
/// 1. If there's a timestamp index use MergeUpdateClause::filter_index_match this will produce a vector of size equal
/// to the number of rows from the source that fall into the processed slice. Each vector will contain a vector of
/// row-indexes in target that match the corresponding soruce index value.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Typo soruce

} else {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
source_->has_tensors(), "Input frame does not contain neither a segment nor tensors"
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd maybe find it easier to read if written as:

if (has_segment){
    // do somethig with segment
} else if (has_tensors()) {
    // do something with tensors
} else {
    internal::raise<E_ASSERTION_FAILURE>
}

std::span<const size_t> rows_to_update_for_source_row =
rows_to_update[source_row - source_row_start];
if (rows_to_update_for_source_row.empty()) {
++source_row;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe use a for(; source_row < source_row_end; ++source_row) and not increment it in two places?

Also I think this if is not really needed. If rows_to_update_for_source_row.empty() The for loop below wouldn't do anything and just incrementing the iterator every time wouldn't hurt.

"Fixed string sequences are not supported for merge update"
);
} else if constexpr (is_dynamic_string_type(TDT::data_type())) {
std::optional<ScopedGILLock> gil_lock;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can copy paste the doc on the optional scoped gil from here. Took me a bit to figure out why we were initializing a nullopt gil_lock

arcticdb::ColumnData cd = segment.column_data(i);
for (auto it = cd.begin<TDT>(); it != cd.end<TDT>(); ++it) {
if constexpr (std::same_as<typename TDT::DataTypeTag::raw_type, int8_t>) {
out = fmt::format_to(out, "{} ", i, int(*it));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we convert the int8_t to int which is probably int32_t?

constexpr static bool is_sequence = is_sequence_type(col_type_info::data_type);
if constexpr (is_input_string_like && is_sequence) {
// Clang has a bug where it the for_each is just regular range-based for the constexpr if will not
// the body of the if even if the condition is false. This leads to compile time errors because it tries
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Typos: Maybe meant to say:

            // Clang has a bug where IF the for_each is just regular range-based for the constexpr if will COMPILE
            // the body of the if even if the condition is false. This leads to compile time errors because it tries
            // to call set_string with non-string values.
            // https://stackoverflow.com/questions/79817660/discarded-branch-of-c-constexpr-if-fails-compilation-because-it-calls-non-matc

std::is_same_v<typename col_type_info::RawType, InputValueType>,
"Type mismatch when setting data for Column[{}]. Column data type is {}.",
column_index,
col_type_info::data_type
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can also include the typeid(InputValueType).name() to the error message


template<
std::ranges::sized_range IndexCols, typename ColumnSlice, std::ranges::sized_range CurrentCol,
std::ranges::sized_range... RestCols>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: Does adding these heavily templatized methods impact build time?

I guess it should be fine as long as we don't use these utilities to do anything crazy like creating sliced segments with 100 columns.

E.g. this recursive method would need to get compiled N times for N columns segment.

if (descriptor.fields().size() <= cols_per_segment) {
return std::vector{descriptor};
}
const size_t num_segments = descriptor.fields().size() / cols_per_segment;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we round up? E.g. with 3 fields and 2 cols per segment we'll end up dropping the last field?

std::vector<SliceAndKey>&& new_slices, std::unique_ptr<proto::descriptors::UserDefinedMetadata>&& user_meta
) {
ranges::sort(new_slices, [](const SliceAndKey& a, const SliceAndKey& b) {
return std::tie(a.slice_.col_range, a.slice_.row_range) < std::tie(b.slice_.col_range, b.slice_.row_range);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't SliceAndKey already have an implemented ordering operator here

std::vector<SliceAndKey> merged_ranges_and_keys;
auto new_slice = new_slices.begin();
for (SliceAndKey& slice : pipeline_context.slice_and_keys_) {
if (new_slice != new_slices.end() && new_slice->slice_ == slice.slice_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a TODO here for when we implement insert would be good. insert would potentially produce new slices. Also depending on how we decide to handle defragmentation on merge we might need to also do something more special

With insert we would also need to adjust the row range on pre existing slices.

@@ -1 +1 @@
Subproject commit 2dc54dfc5af9fb973860a38e4245ae0063740988
Subproject commit 7220a4eebf20515cdd9c34721e15ca082bae9038
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was a vcpkg submodule update needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants