Skip to content

Commit 8614e5a

Browse files
committed
Fix and test arcticdb reading streaming data
Fixes: - Column filter in static schema - Column ordering when introducing a new column with an incomplete segment Tests: - Columns filter in static and dynamic schema - Reading diffrent schema incompletes - Compatibility test for reading incompletes from an old env
1 parent e55aa2f commit 8614e5a

File tree

4 files changed

+103
-8
lines changed

4 files changed

+103
-8
lines changed

cpp/arcticdb/pipeline/read_pipeline.hpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,24 @@ inline void generate_filtered_field_descriptors(std::shared_ptr<PipelineContext>
141141
generate_filtered_field_descriptors(*context, columns);
142142
}
143143

144+
inline void get_column_bitset_in_context(
145+
const ReadQuery& query,
146+
const std::shared_ptr<PipelineContext>& pipeline_context) {
147+
pipeline_context->set_selected_columns(query.columns);
148+
pipeline_context->overall_column_bitset_ = overall_column_bitset(pipeline_context->descriptor(),
149+
query.clauses_,
150+
pipeline_context->selected_columns_);
151+
}
152+
144153
template<class ContainerType>
145154
inline std::vector<FilterQuery<ContainerType>> get_column_bitset_and_query_functions(
146155
const ReadQuery& query,
147156
const std::shared_ptr<PipelineContext>& pipeline_context,
148157
bool dynamic_schema,
149158
bool column_groups) {
150159
using namespace arcticdb::pipelines::index;
151-
152160
if(!dynamic_schema || column_groups) {
153-
pipeline_context->set_selected_columns(query.columns);
154-
pipeline_context->overall_column_bitset_ = overall_column_bitset(pipeline_context->descriptor(),
155-
query.clauses_,
156-
pipeline_context->selected_columns_);
161+
get_column_bitset_in_context(query, pipeline_context);
157162
}
158163
return build_read_query_filters<ContainerType>(pipeline_context, query.row_filter, dynamic_schema, column_groups);
159164
}

cpp/arcticdb/version/version_core.cpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -1085,9 +1085,17 @@ bool read_incompletes_to_pipeline(
10851085
// Mark the start point of the incompletes, so we know that there is no column slicing after this point
10861086
pipeline_context->incompletes_after_ = pipeline_context->slice_and_keys_.size();
10871087

1088-
// If there are only incompletes we need to add the index here
10891088
if(pipeline_context->slice_and_keys_.empty()) {
1089+
// If there are only incompletes we need to do the following (typically done when reading the index key):
1090+
// - add the index columns to query
1091+
// - in case of static schema: populate the descriptor and column_bitset
10901092
add_index_columns_to_query(read_query, seg.index_descriptor());
1093+
if (!dynamic_schema) {
1094+
pipeline_context->desc_ = seg.descriptor();
1095+
get_column_bitset_in_context(
1096+
read_query,
1097+
pipeline_context);
1098+
}
10911099
}
10921100
pipeline_context->slice_and_keys_.insert(std::end(pipeline_context->slice_and_keys_), incomplete_segments.begin(), incomplete_segments.end());
10931101

@@ -1116,9 +1124,9 @@ bool read_incompletes_to_pipeline(
11161124
pipeline_context->staged_descriptor_ =
11171125
merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns);
11181126
if (pipeline_context->desc_) {
1119-
const std::array fields_ptr = {pipeline_context->desc_->fields_ptr()};
1127+
const std::array staged_fields_ptr = {pipeline_context->staged_descriptor_->fields_ptr()};
11201128
pipeline_context->desc_ =
1121-
merge_descriptors(*pipeline_context->staged_descriptor_, fields_ptr, read_query.columns);
1129+
merge_descriptors(*pipeline_context->desc_, staged_fields_ptr, read_query.columns);
11221130
} else {
11231131
pipeline_context->desc_ = pipeline_context->staged_descriptor_;
11241132
}

python/tests/compat/arcticdb/test_compatibility.py

+37
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from packaging import version
33
import pandas as pd
4+
import numpy as np
45
from arcticdb.util.test import assert_frame_equal
56
from arcticdb.options import ModifiableEnterpriseLibraryOption
67
from arcticdb.toolbox.library_tool import LibraryTool
@@ -171,3 +172,39 @@ def test_compat_snapshot_metadata_read(old_venv_and_arctic_uri, lib_name):
171172
snaps = curr.lib.list_snapshots()
172173
meta = snaps["old_snap"]
173174
assert meta == {"old_key": "old_value"}
175+
176+
177+
def test_compat_read_incomplete(old_venv_and_arctic_uri, lib_name):
178+
old_venv, arctic_uri = old_venv_and_arctic_uri
179+
sym = "sym"
180+
df = pd.DataFrame({"col": np.arange(10), "float_col": np.arange(10, dtype=np.float64)}, pd.date_range("2024-01-01", periods=10))
181+
df_1 = df.iloc[:8]
182+
df_2 = df.iloc[8:]
183+
184+
old_ac = old_venv.create_arctic(arctic_uri)
185+
old_lib = old_ac.create_library(lib_name)
186+
187+
if version.Version(old_venv.version) >= version.Version("5.1.0"):
188+
# In version 5.1.0 (with commit a3b7545) we moved the streaming incomplete python API to the library tool.
189+
old_lib.execute([
190+
"""
191+
lib_tool = lib.library_tool()
192+
lib_tool.append_incomplete("sym", df_1)
193+
lib_tool.append_incomplete("sym", df_2)
194+
"""
195+
], dfs={"df_1": df_1, "df_2": df_2})
196+
else:
197+
old_lib.execute([
198+
"""
199+
lib._nvs.append("sym", df_1, incomplete=True)
200+
lib._nvs.append("sym", df_2, incomplete=True)
201+
"""
202+
], dfs={"df_1": df_1, "df_2": df_2})
203+
204+
205+
with CurrentVersion(arctic_uri, lib_name) as curr:
206+
read_df = curr.lib._nvs.read(sym, date_range=(None, None), incomplete=True).data
207+
assert_frame_equal(read_df, df)
208+
209+
read_df = curr.lib._nvs.read(sym, date_range=(None, None), incomplete=True, columns=["float_col"]).data
210+
assert_frame_equal(read_df, df[["float_col"]])

python/tests/unit/arcticdb/version_store/test_incompletes.py

+45
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from arcticdb.exceptions import MissingDataException
1313
from arcticdb_ext.storage import KeyType
1414

15+
from arcticdb.util.venv import CurrentVersion
1516

1617
@pytest.mark.parametrize("batch", (True, False))
1718
def test_read_incompletes_with_indexed_data(lmdb_version_store_v1, batch):
@@ -80,3 +81,47 @@ def test_read_incompletes_no_chunking(lmdb_version_store_tiny_segment):
8081

8182
ref_keys = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym)
8283
assert len(ref_keys) == 1
84+
85+
@pytest.mark.parametrize("dynamic_schema", [True, False])
86+
def test_read_incompletes_columns_filter(version_store_factory, dynamic_schema):
87+
lib = version_store_factory(dynamic_schema=dynamic_schema)
88+
lib_tool = lib.library_tool()
89+
sym = "sym"
90+
df = pd.DataFrame({"col": np.arange(10), "float_col": np.arange(10, dtype=np.float64)}, pd.date_range("2024-01-01", periods=10))
91+
lib_tool.append_incomplete(sym, df.iloc[:8])
92+
lib_tool.append_incomplete(sym, df.iloc[8:])
93+
94+
date_range = (None, None)
95+
col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["col"]).data
96+
assert_frame_equal(col_df, df[["col"]])
97+
98+
float_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col"]).data
99+
assert_frame_equal(float_col_df, df[["float_col"]])
100+
101+
def test_read_incompletes_dynamic(lmdb_version_store_dynamic_schema_v1):
102+
lib = lmdb_version_store_dynamic_schema_v1
103+
lib_tool = lib.library_tool()
104+
sym = "sym"
105+
def get_index(days_after_epoch, num_days):
106+
start = pd.Timestamp(0) + pd.DateOffset(days=days_after_epoch)
107+
return pd.date_range(start, periods=num_days, freq="d")
108+
109+
df_1 = pd.DataFrame({"col_1": [1., 2., 3.], "col_2": [1., 2., 3.]}, index=get_index(0, 3))
110+
df_2 = pd.DataFrame({"col_2": [4., 5.], "col_3": [1., 2.]}, index=get_index(3, 2))
111+
df_3 = pd.DataFrame({"col_3": [3., 4.], "col_4": [1., 2.]}, index=get_index(5, 2))
112+
113+
lib_tool.append_incomplete(sym, df_1)
114+
lib_tool.append_incomplete(sym, df_2)
115+
116+
date_range = (None, None)
117+
df = lib.read(sym, date_range=date_range, incomplete=True).data
118+
assert_frame_equal(df, pd.concat([df_1, df_2]))
119+
120+
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)
121+
122+
df = lib.read(sym, date_range=date_range, incomplete=True).data
123+
assert_frame_equal(df, pd.concat([df_1, df_2]))
124+
125+
lib_tool.append_incomplete(sym, df_3)
126+
df = lib.read(sym, date_range=date_range, incomplete=True).data
127+
assert_frame_equal(df, pd.concat([df_1, df_2, df_3]))

0 commit comments

Comments
 (0)