Skip to content

Commit c6a0567

Browse files
committed
Fix symbol list bug on compaction (#1324)
- Only adds a symbol list entry on append if there is no previous version. - Fixes `finalize_staged_data` to write to the symbol list. - Adds a python test for the new behavior.
1 parent 3757599 commit c6a0567

File tree

4 files changed

+52
-7
lines changed

4 files changed

+52
-7
lines changed

cpp/arcticdb/version/local_versioned_engine.cpp

+14-4
Original file line numberDiff line numberDiff line change
@@ -993,19 +993,28 @@ void LocalVersionedEngine::write_parallel_frame(
993993
write_parallel(store_, stream_id, frame, validate_index);
994994
}
995995

996+
void LocalVersionedEngine::add_to_symbol_list_on_compaction(
997+
const StreamId& stream_id,
998+
const CompactIncompleteOptions& options,
999+
const UpdateInfo& update_info) {
1000+
if(cfg_.symbol_list()) {
1001+
if (!options.append_ || !update_info.previous_index_key_.has_value()) {
1002+
symbol_list().add_symbol(store_, stream_id, update_info.next_version_id_);
1003+
}
1004+
}
1005+
}
1006+
9961007
VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
9971008
const StreamId& stream_id,
9981009
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
9991010
const CompactIncompleteOptions& options) {
10001011
log::version().debug("Compacting incomplete symbol {}", stream_id);
10011012

10021013
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
1003-
auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options());
1014+
auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options());
10041015

10051016
write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
1006-
1007-
if(cfg_.symbol_list())
1008-
symbol_list().add_symbol(store_, stream_id, update_info.next_version_id_);
1017+
add_to_symbol_list_on_compaction(stream_id, options, update_info);
10091018

10101019
return versioned_item;
10111020
}
@@ -1654,6 +1663,7 @@ VersionedItem LocalVersionedEngine::sort_merge_internal(
16541663
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
16551664
auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, options);
16561665
write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
1666+
add_to_symbol_list_on_compaction(stream_id, options, update_info);
16571667
return versioned_item;
16581668
}
16591669

cpp/arcticdb/version/local_versioned_engine.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ class LocalVersionedEngine : public VersionedEngine {
441441

442442
private:
443443
void initialize(const std::shared_ptr<storage::Library>& library);
444+
void add_to_symbol_list_on_compaction(const StreamId& stream_id, const CompactIncompleteOptions& options, const UpdateInfo& update_info);
444445

445446
std::shared_ptr<Store> store_;
446447
arcticdb::proto::storage::VersionStoreConfig cfg_;

python/arcticdb/version_store/library.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ def finalize_staged_data(
11091109
mode: Optional[StagedDataFinalizeMethod] = StagedDataFinalizeMethod.WRITE,
11101110
prune_previous_versions: bool = False,
11111111
metadata: Any = None,
1112-
validate_index=True,
1112+
validate_index = True,
11131113
) -> VersionedItem:
11141114
"""
11151115
Finalizes staged data, making it available for reads.
@@ -1157,7 +1157,7 @@ def sort_and_finalize_staged_data(
11571157
symbol: str,
11581158
mode: Optional[StagedDataFinalizeMethod] = StagedDataFinalizeMethod.WRITE,
11591159
prune_previous_versions: bool = False,
1160-
metadata: Any = None
1160+
metadata: Any = None,
11611161
) -> VersionedItem:
11621162
"""
11631163
sort_merge will sort and finalize staged data. This differs from `finalize_staged_data` in that it

python/tests/unit/arcticdb/version_store/test_sort_merge.py

+35-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import numpy as np
33
from pandas.testing import assert_frame_equal
44
import pytest
5+
from arcticdb_ext.storage import KeyType
56
from arcticdb.version_store.library import StagedDataFinalizeMethod
67
from arcticdb.exceptions import UserInputException, SortingException
78

@@ -268,4 +269,37 @@ def test_append_to_missing_symbol(lmdb_library):
268269
df = pd.DataFrame({"col": [1]}, index=pd.DatetimeIndex([np.datetime64('2023-01-01')], dtype="datetime64[ns]"))
269270
lib.write("sym", df, staged=True)
270271
lib.sort_and_finalize_staged_data("sym", mode=StagedDataFinalizeMethod.APPEND)
271-
assert_frame_equal(lib.read("sym").data, df)
272+
assert_frame_equal(lib.read("sym").data, df)
273+
274+
275+
def test_update_symbol_list(lmdb_library):
276+
lib = lmdb_library
277+
lib_tool = lmdb_library._nvs.library_tool()
278+
sym = "sym"
279+
sym_2 = "sym_2"
280+
df = pd.DataFrame({"col": [1]}, index=pd.DatetimeIndex([np.datetime64('2023-01-01')], dtype="datetime64[ns]"))
281+
282+
# We always add to the symbol list on write
283+
lib.write(sym, df, staged=True)
284+
lib.sort_and_finalize_staged_data(sym, mode=StagedDataFinalizeMethod.WRITE)
285+
assert lib_tool.count_keys(KeyType.SYMBOL_LIST) == 1
286+
assert lib.list_symbols() == [sym]
287+
288+
# We don't add to the symbol on append when there is an existing version
289+
lib.write(sym, df, staged=True)
290+
lib.sort_and_finalize_staged_data(sym, mode=StagedDataFinalizeMethod.APPEND)
291+
assert lib_tool.count_keys(KeyType.SYMBOL_LIST) == 1
292+
assert lib.list_symbols() == [sym]
293+
294+
# We always add to the symbol list on write, even when there is an existing version
295+
lib.write(sym, df, staged=True)
296+
lib.sort_and_finalize_staged_data(sym, mode=StagedDataFinalizeMethod.WRITE)
297+
assert lib_tool.count_keys(KeyType.SYMBOL_LIST) == 2
298+
assert lib.list_symbols() == [sym]
299+
300+
# We add to the symbol list on append when there is no previous version
301+
lib.write(sym_2, df, staged=True)
302+
lib.sort_and_finalize_staged_data(sym_2, mode=StagedDataFinalizeMethod.APPEND)
303+
assert lib_tool.count_keys(KeyType.SYMBOL_LIST) == 3
304+
assert set(lib.list_symbols()) == set([sym, sym_2])
305+

0 commit comments

Comments
 (0)