Skip to content

Commit d28b7a5

Browse files
committed
Rework C++ truncation for arrow
Arrow requires truncation to happen in C++ layer. Previous logic had many issues: - Wrong truncated range calculation - Wrong offsets used when first row offset != 0 - Wrong row range calculation This PR fixes the issues by rewriting a lot of the column truncation logic and adds more tests.
1 parent 259a5e7 commit d28b7a5

File tree

7 files changed

+91
-55
lines changed

7 files changed

+91
-55
lines changed

cpp/arcticdb/column_store/chunked_buffer.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,16 +475,16 @@ class ChunkedBufferImpl {
475475
util::check(block == *blocks_.begin(), "Truncate first block position {} not within initial block", bytes);
476476
util::check(bytes < block->bytes(), "Can't truncate {} bytes from a {} byte block", bytes, block->bytes());
477477
auto remaining_bytes = block->bytes() - bytes;
478-
auto new_block = create_block(bytes, 0);
478+
auto new_block = create_block(remaining_bytes, block->offset_);
479479
new_block->copy_from(block->data() + bytes, remaining_bytes, 0);
480480
blocks_[0] = new_block;
481481
block->abandon();
482482
delete block;
483483
}
484484

485485
void truncate_last_block(size_t bytes) {
486-
auto [block, offset, ts] = block_and_offset(bytes);
487-
util::check(block == *blocks_.rbegin(), "Truncate first block position {} not within initial block", bytes);
486+
auto [block, offset, ts] = block_and_offset(bytes_ - bytes);
487+
util::check(block == *blocks_.rbegin(), "Truncate last block position {} not within last block", bytes);
488488
util::check(bytes < block->bytes(), "Can't truncate {} bytes from a {} byte block", bytes, block->bytes());
489489
auto remaining_bytes = block->bytes() - bytes;
490490
auto new_block = create_block(remaining_bytes, block->offset_);

cpp/arcticdb/column_store/column.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -590,20 +590,17 @@ std::vector<std::shared_ptr<Column>> Column::split(const std::shared_ptr<Column>
590590
return output;
591591
}
592592

593-
void Column::truncate_first_block(size_t row) {
593+
void Column::truncate_first_block(size_t start_row) {
594594
if(!is_sparse()) {
595-
auto bytes = data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL) * row;
595+
auto bytes = start_row * data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL);
596596
data_.buffer().truncate_first_block(bytes);
597597
}
598598
}
599599

600-
void Column::truncate_last_block(size_t row) {
600+
void Column::truncate_last_block(size_t end_row) {
601601
if(!is_sparse()) {
602602
const auto column_row_count = row_count();
603-
if(row < static_cast<size_t>(column_row_count))
604-
return;
605-
606-
auto bytes = data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL) * (column_row_count - row);
603+
auto bytes = (column_row_count - end_row) * data_type_size(type_, OutputFormat::NATIVE, DataTypeMode::INTERNAL);
607604
data_.buffer().truncate_last_block(bytes);
608605
}
609606
}

cpp/arcticdb/column_store/column.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ class Column {
742742
// At least one value in the column exactly matches the input val
743743
// Search to the right/left for the last/first such value
744744
if (from_right) {
745+
// TODO: Super inefficient
745746
while (++mid <= high && val == accessor.at(mid)) {}
746747
res = mid;
747748
} else {

cpp/arcticdb/pipeline/frame_slice.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct RowRange : AxisRange {
6060
};
6161

6262
inline bool contains(const RowRange& range, size_t row) {
63-
return row >= range.first && row <= range.second;
63+
return row >= range.first && row < range.second;
6464
}
6565

6666
struct FrameSlice {

cpp/arcticdb/pipeline/read_frame.cpp

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,15 @@ void decode_index_field(
247247
void handle_truncation(
248248
Column& dest_column,
249249
const ColumnTruncation& truncate) {
250-
if(dest_column.num_blocks() == 1 && truncate.start_ && truncate.end_)
250+
if(dest_column.num_blocks() == 1 && truncate.start_ && truncate.end_) {
251251
dest_column.truncate_single_block(*truncate.start_, *truncate.end_);
252-
else if(truncate.start_)
253-
dest_column.truncate_first_block(*truncate.start_);
254-
else if(truncate.end_)
255-
dest_column.truncate_last_block(*truncate.end_);
252+
}
253+
else {
254+
if(truncate.start_)
255+
dest_column.truncate_first_block(*truncate.start_);
256+
if(truncate.end_)
257+
dest_column.truncate_last_block(*truncate.end_);
258+
}
256259
}
257260

258261
void handle_truncation(
@@ -329,36 +332,40 @@ void decode_or_expand(
329332
template <typename IndexValueType>
330333
ColumnTruncation get_truncate_range_from_index(
331334
const Column& column,
332-
const IndexValueType& start,
333-
const IndexValueType& end,
334-
std::optional<int64_t> start_offset = std::nullopt,
335-
std::optional<int64_t> end_offset = std::nullopt) {
336-
int64_t start_row = column.search_sorted<IndexValueType>(start, false, start_offset, end_offset);
337-
int64_t end_row = column.search_sorted<IndexValueType>(end, true, start_offset, end_offset);
335+
const IndexValueType& filter_start,
336+
const IndexValueType& filter_end,
337+
int64_t start_col_offset,
338+
int64_t end_col_offset) {
339+
// search_sorted expects inclusive end_col_offset
340+
auto inclusive_end_col_offset = end_col_offset - 1;
341+
int64_t start_row = column.search_sorted<IndexValueType>(filter_start, false, start_col_offset, inclusive_end_col_offset);
342+
int64_t end_row = column.search_sorted<IndexValueType>(filter_end, true, start_col_offset, inclusive_end_col_offset);
343+
338344
std::optional<int64_t> truncate_start;
339345
std::optional<int64_t> truncate_end;
340-
if((start_offset && start_row != *start_offset) || (!start_offset && start_row > 0))
346+
if(start_row != start_col_offset)
341347
truncate_start = start_row;
342348

343-
if((end_offset && end_row != *end_offset) || (!end_offset && end_row < column.row_count() - 1))
349+
if(end_row != end_col_offset)
344350
truncate_end = end_row;
345351

346352
return {truncate_start, truncate_end};
347353
}
348354

349-
std::pair<std::optional<int64_t>, std::optional<int64_t>> get_truncate_range_from_rows(
350-
const RowRange& row_range,
351-
size_t start_offset,
352-
size_t end_offset) {
355+
ColumnTruncation get_truncate_range_from_rows(
356+
const RowRange& slice_range,
357+
size_t row_filter_start,
358+
size_t row_filter_end) {
353359
std::optional<int64_t> truncate_start;
354360
std::optional<int64_t> truncate_end;
355-
if(contains(row_range, start_offset))
356-
truncate_start = start_offset;
361+
// TODO: Explain
362+
if(contains(slice_range, row_filter_start) && row_filter_start != slice_range.start())
363+
truncate_start = row_filter_start;
357364

358-
if(contains(row_range, end_offset))
359-
truncate_end = end_offset;
365+
if(contains(slice_range, row_filter_end) && row_filter_end != slice_range.start())
366+
truncate_end = row_filter_end;
360367

361-
return std::make_pair(truncate_start, truncate_end);
368+
return {truncate_start, truncate_end};
362369
}
363370

364371
ColumnTruncation get_truncate_range(
@@ -370,32 +377,39 @@ ColumnTruncation get_truncate_range(
370377
const EncodedFieldImpl& index_field,
371378
const uint8_t* index_field_offset) {
372379
ColumnTruncation truncate_rows;
380+
const auto& slice_row_range = context.slice_and_key().slice().row_range;
381+
const auto& first_row_offset = frame.offset();
382+
auto column_slice_row_range = RowRange(slice_row_range.first - first_row_offset, slice_row_range.second - first_row_offset);
373383
if(read_options.output_format() == OutputFormat::ARROW) {
374384
util::variant_match(read_query.row_filter,
375-
[&truncate_rows, &frame, &context, &index_field, index_field_offset, encoding_version] (const IndexRange& index_range) {
376-
const auto& time_range = static_cast<const TimestampRange&>(index_range);
385+
[&truncate_rows, &column_slice_row_range, &frame, &context, &index_field, index_field_offset, encoding_version] (const IndexRange& index_filter) {
386+
const auto& time_filter = static_cast<const TimestampRange&>(index_filter);
377387
const auto& slice_time_range = context.slice_and_key().key().time_range();
378-
if(contains(slice_time_range, time_range.first) || contains(slice_time_range, time_range.second)) {
388+
// The `get_truncate_range_from_index` is O(logn). The `contains` checks serves to avoid the expensive
389+
// O(logn) check for blocks in the middle of the range
390+
if(contains(slice_time_range, time_filter.first) || contains(slice_time_range, time_filter.second)) {
379391
if(context.fetch_index()) {
380392
const auto& index_column = frame.column(0);
381-
truncate_rows = get_truncate_range_from_index(index_column, time_range.first, time_range.second);
393+
truncate_rows = get_truncate_range_from_index(index_column, time_filter.first, time_filter.second, column_slice_row_range.first, column_slice_row_range.second);
382394
} else {
383395
const auto& frame_index_desc = frame.descriptor().fields(0UL);
384396
Column sink{frame_index_desc.type(), encoding_sizes::field_uncompressed_size(index_field), AllocationType::PRESIZED, Sparsity::PERMITTED};
385397
std::optional<util::BitMagic> bv;
386398
(void)decode_field(frame_index_desc.type(), index_field, index_field_offset, sink, bv, encoding_version);
387-
truncate_rows = get_truncate_range_from_index(sink, time_range.first, time_range.second);
399+
truncate_rows = get_truncate_range_from_index(sink, time_filter.first, time_filter.second, column_slice_row_range.first, column_slice_row_range.second);
388400
}
389401
}
390402
},
391-
[&context] (const RowRange& row_range) {
392-
const auto& slice_row_range = context.slice_and_key().slice().row_range;
393-
get_truncate_range_from_rows(row_range, slice_row_range.start(), slice_row_range.end());
403+
[&truncate_rows, &column_slice_row_range, &first_row_offset] (const RowRange& row_filter) {
404+
// The row_filter is with respect to global offset. Column truncation cares about column indices.
405+
auto row_filter_start = row_filter.first - first_row_offset;
406+
auto row_filter_end = row_filter.second - first_row_offset;
407+
truncate_rows = get_truncate_range_from_rows(column_slice_row_range, row_filter_start, row_filter_end);
394408
},
395409
[] (const auto&) {
396410
// Do nothing
397411
});
398-
}
412+
}
399413
return truncate_rows;
400414
};
401415

python/arcticdb/version_store/_store.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,7 @@ def update(
824824
If a range is specified, it will clear/delete the data within the
825825
range and overwrite it with the data in `data`. This allows the user
826826
to update with data that might only be a subset of the
827-
original data.
827+
original data. Note date_range is end-inclusive.
828828
upsert: bool, default=False
829829
If True, will write the data even if the symbol does not exist.
830830
prune_previous_version
@@ -1932,18 +1932,19 @@ def read(
19321932
`str` : snapshot name which contains the version
19331933
`datetime.datetime` : the version of the data that existed as_of the requested point in time
19341934
date_range: `Optional[DateRangeInput]`, default=None
1935-
DateRange to read data for. Applicable only for Pandas data with a DateTime index. Returns only the part
1936-
of the data that falls within the given range. The same effect can be achieved by using the date_range
1937-
clause of the QueryBuilder class, which will be slower, but return data with a smaller memory footprint.
1938-
See the QueryBuilder.date_range docstring for more details.
1935+
DateRange to read data for. Inclusive both for lower and upper bounds. Applicable only for dataframes with
1936+
a DateTime index. Returns only the part of the data that falls within the given range.
1937+
The same effect can be achieved by using the date_range clause of the QueryBuilder class, which will be
1938+
slower, but return data with a smaller memory footprint. See the QueryBuilder.date_range docstring for more
1939+
details.
19391940
Only one of date_range or row_range can be provided.
19401941
row_range: `Optional[Tuple[int, int]]`, default=None
19411942
Row range to read data for. Inclusive of the lower bound, exclusive of the upper bound
19421943
lib.read(symbol, row_range=(start, end)).data should behave the same as df.iloc[start:end], including in
19431944
the handling of negative start/end values.
19441945
Only one of date_range or row_range can be provided.
19451946
columns: `Optional[List[str]]`, default=None
1946-
Applicable only for Pandas data. Determines which columns to return data for.
1947+
Applicable only for dataframes. Determines which columns to return data for.
19471948
query_builder: 'Optional[QueryBuilder]', default=None
19481949
A QueryBuilder object to apply to the dataframe before it is returned.
19491950
For more information see the documentation for the QueryBuilder class.

python/tests/unit/arcticdb/version_store/test_arrow.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ def test_basic(lmdb_version_store_v1):
2121

2222

2323
# TODO: Do this fix during normalization in frontend PR
24-
def fix_timeseries_index(df):
24+
def fix_timeseries_index(df, set_index=False):
2525
df["index"] = df["index"].apply(lambda x : pd.Timestamp(x))
26+
if set_index:
27+
df = df.set_index("index")
2628
return df
2729

2830

@@ -130,9 +132,10 @@ def test_all_types(lmdb_version_store_v1):
130132
assert_frame_equal(result, df)
131133

132134

135+
@pytest.mark.parametrize("segment_row_size", [1, 2, 10, 100])
133136
@pytest.mark.parametrize("start_offset,end_offset", [(2, 3), (3, 75), (4, 32), (0, 99), (7, 56)])
134-
def test_date_range(lmdb_version_store_v1, start_offset, end_offset):
135-
lib = lmdb_version_store_v1
137+
def test_date_range(version_store_factory, segment_row_size, start_offset, end_offset):
138+
lib = version_store_factory(segment_row_size=segment_row_size)
136139
initial_timestamp = pd.Timestamp("2019-01-01")
137140
df = pd.DataFrame(data=np.arange(100), index=pd.date_range(initial_timestamp, periods=100), columns=['x'])
138141
sym = "arrow_date_test"
@@ -143,14 +146,34 @@ def test_date_range(lmdb_version_store_v1, start_offset, end_offset):
143146

144147
date_range = (query_start_ts, query_end_ts)
145148
data_closed_table = lib.read(sym, date_range=date_range, _output_format=OutputFormat.ARROW).data
146-
df = data_closed_table.to_pandas()
147-
df = df.set_index('index')
148-
assert query_start_ts == pd.Timestamp(df.index[0])
149-
assert query_end_ts == pd.Timestamp(df.index[-1])
149+
df = fix_timeseries_index(data_closed_table.to_pandas(), set_index=True)
150+
assert query_start_ts == df.index[0]
151+
assert query_end_ts == df.index[-1]
150152
assert df['x'].iloc[0] == start_offset
151153
assert df['x'].iloc[-1] == end_offset
152154

153155

156+
@pytest.mark.parametrize("segment_row_size", [1, 2, 10, 100])
157+
@pytest.mark.parametrize("start_offset,end_offset", [(2, 4), (3, 76), (4, 33), (0, 100), (7, 57)])
158+
def test_row_range(version_store_factory, segment_row_size, start_offset, end_offset):
159+
lib = version_store_factory(segment_row_size=segment_row_size)
160+
initial_timestamp = pd.Timestamp("2019-01-01")
161+
df = pd.DataFrame(data=np.arange(100), index=pd.date_range(initial_timestamp, periods=100), columns=['x'])
162+
sym = "arrow_date_test"
163+
lib.write(sym, df)
164+
165+
row_range = (start_offset, end_offset)
166+
data_closed_table = lib.read(sym, row_range=row_range, _output_format=OutputFormat.ARROW).data
167+
df = fix_timeseries_index(data_closed_table.to_pandas(), set_index=True)
168+
169+
start_ts = initial_timestamp + pd.DateOffset(start_offset)
170+
end_ts = initial_timestamp + pd.DateOffset(end_offset-1)
171+
assert start_ts == df.index[0]
172+
assert end_ts == df.index[-1]
173+
assert df['x'].iloc[0] == start_offset
174+
assert df['x'].iloc[-1] == end_offset-1
175+
176+
154177
def test_with_querybuilder(lmdb_version_store_v1):
155178
lib = lmdb_version_store_v1
156179
df = pd.DataFrame({"x": np.arange(10), "y": np.arange(10.0, 20.0)})

0 commit comments

Comments
 (0)