From 34fc9ee550ecea54fc65a3303599402c759c4b63 Mon Sep 17 00:00:00 2001 From: phoebusm Date: Tue, 28 Oct 2025 14:43:46 +0000 Subject: [PATCH 1/6] Support recursive normalizer in write in v2 API --- build_tooling/format.py | 38 +++++++++++++------ .../adapters/arctic_library_adapter.py | 2 +- python/arcticdb/options.py | 12 +++++- python/arcticdb/version_store/_store.py | 21 +++++----- python/arcticdb/version_store/library.py | 23 ++++++++++- python/tests/conftest.py | 13 +++++++ .../tests/integration/arcticdb/test_arctic.py | 16 ++++++++ .../test_recursive_normalizers.py | 36 ++++++++++++++++++ 8 files changed, 137 insertions(+), 24 deletions(-) diff --git a/build_tooling/format.py b/build_tooling/format.py index 6506474a7b9..eb417248cb1 100644 --- a/build_tooling/format.py +++ b/build_tooling/format.py @@ -31,30 +31,38 @@ def install_tools(): return black or clang -def lint_python(in_place: bool): +def lint_python(in_place: bool, specific_file: str): try: import black assert black.__version__ == black_version except ImportError: raise RuntimeError("black not installed. Run this script with --install-tools then try again") + if specific_file: + path = specific_file + else: + path = "python/" + if in_place: - return subprocess.run(["black", "-l", "120", "python/"]).returncode + return subprocess.run(["black", "-l", "120", path]).returncode else: - return subprocess.run(["black", "-l", "120", "--check", "python/"]).returncode + return subprocess.run(["black", "-l", "120", "--check", path]).returncode -def lint_cpp(in_place: bool): +def lint_cpp(in_place: bool, specific_file: str): try: import clang_format except ImportError: raise RuntimeError("clang-format not installed. Run this script with --install-tools then try again") files = [] - root = pathlib.Path("cpp", "arcticdb") - for e in ("*.cpp", "*.hpp"): - for f in root.rglob(e): - files.append(str(f)) + if specific_file: + files.append(specific_file) + else: + root = pathlib.Path("cpp", "arcticdb") + for e in ("*.cpp", "*.hpp"): + for f in root.rglob(e): + files.append(str(f)) args = ["clang-format"] if in_place: @@ -69,11 +77,11 @@ def lint_cpp(in_place: bool): return subprocess.run(args).returncode -def main(type: str, in_place: bool): +def main(type: str, in_place: bool, specific_file: str): if type == "python": - return lint_python(in_place) + return lint_python(in_place, specific_file) elif type == "cpp": - return lint_cpp(in_place) + return lint_cpp(in_place, specific_file) else: return lint_python(in_place) or lint_cpp(in_place) @@ -105,6 +113,11 @@ def main(type: str, in_place: bool): action='store_true', help="Apply linting rules to your working copy. Changes files." ) + parser.add_argument( + "-f", + "--file", + help="Apply linting rules to a specific file." + ) args = parser.parse_args() if args.install_tools: @@ -118,10 +131,13 @@ def main(type: str, in_place: bool): raise RuntimeError("Must specify --type") if args.type not in ("python", "cpp", "all"): raise RuntimeError("Invalid --type") + if args.type == "all" and args.file: + raise RuntimeError("Cannot specify file type when specifying a single file for formatting") return_code = main( type=args.type, in_place=args.in_place, + specific_file=args.file, ) sys.exit(return_code) diff --git a/python/arcticdb/adapters/arctic_library_adapter.py b/python/arcticdb/adapters/arctic_library_adapter.py index 51951c970a0..edbd497049f 100644 --- a/python/arcticdb/adapters/arctic_library_adapter.py +++ b/python/arcticdb/adapters/arctic_library_adapter.py @@ -24,7 +24,6 @@ def set_library_options( write_options = lib_desc.version.write_options write_options.dynamic_strings = True - write_options.recursive_normalizers = True write_options.use_tombstones = True write_options.fast_tombstone_all = True lib_desc.version.symbol_list = True @@ -38,6 +37,7 @@ def set_library_options( write_options.de_duplication = options.dedup write_options.segment_row_size = options.rows_per_segment write_options.column_group_size = options.columns_per_segment + write_options.recursive_normalizers = options.recursive_normalizers lib_desc.version.encoding_version = ( options.encoding_version if options.encoding_version is not None else DEFAULT_ENCODING_VERSION diff --git a/python/arcticdb/options.py b/python/arcticdb/options.py index 29b48066731..0e1e668d8e1 100644 --- a/python/arcticdb/options.py +++ b/python/arcticdb/options.py @@ -43,6 +43,7 @@ def __init__( rows_per_segment: int = 100_000, columns_per_segment: int = 127, encoding_version: Optional[EncodingVersion] = None, + recursive_normalizers: bool = True, ): """ Parameters @@ -124,12 +125,19 @@ def __init__( encoding_version: Optional[EncodingVersion], default None The encoding version to use when writing data to storage. v2 is faster, but still experimental, so use with caution. + + recursive_normalizers: bool, default True + Whether to recursively normalize nested data structures when writing sequence-like or dict-like data. + The data structure can be nested or a mix of lists and dictionaries. + Note: If the leaf nodes cannot be natively normalized and must be written using write_pickle, those leaf nodes + will be pickled, resulting in the overall data being only partially normalized and partially pickled. """ self.dynamic_schema = dynamic_schema self.dedup = dedup self.rows_per_segment = rows_per_segment self.columns_per_segment = columns_per_segment self.encoding_version = encoding_version + self.recursive_normalizers = recursive_normalizers def __eq__(self, right): return ( @@ -138,13 +146,15 @@ def __eq__(self, right): and self.rows_per_segment == right.rows_per_segment and self.columns_per_segment == right.columns_per_segment and self.encoding_version == right.encoding_version + and self.recursive_normalizers == right.recursive_normalizers ) def __repr__(self): return ( f"LibraryOptions(dynamic_schema={self.dynamic_schema}, dedup={self.dedup}," f" rows_per_segment={self.rows_per_segment}, columns_per_segment={self.columns_per_segment}," - f" encoding_version={self.encoding_version if self.encoding_version is not None else 'Default'})" + f" encoding_version={self.encoding_version if self.encoding_version is not None else 'Default'}," + f" recursive_normalizers={self.recursive_normalizers})" ) diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 98144283524..9d05eacc8da 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -612,6 +612,15 @@ def stage( log.warning("The data could not be normalized to an ArcticDB format and has not been written") return None + def _is_recursive_normalizers_enabled(self, **kwargs): + return resolve_defaults( + "recursive_normalizers", + self._lib_cfg.lib_desc.version.write_options, + global_default=False, + uppercase=False, + **kwargs, + ) + def write( self, symbol: str, @@ -698,11 +707,9 @@ def write( prune_previous_version = resolve_defaults( "prune_previous_version", proto_cfg, global_default=False, existing_value=prune_previous_version, **kwargs ) - recursive_normalizers = resolve_defaults( - "recursive_normalizers", proto_cfg, global_default=False, uppercase=False, **kwargs - ) parallel = resolve_defaults("parallel", proto_cfg, global_default=False, uppercase=False, **kwargs) incomplete = resolve_defaults("incomplete", proto_cfg, global_default=False, uppercase=False, **kwargs) + recursive_normalizers = self._is_recursive_normalizers_enabled(**kwargs) # TODO remove me when dynamic strings is the default everywhere if parallel: @@ -3147,12 +3154,8 @@ def will_item_be_pickled(self, item, recursive_normalizers: Optional[bool] = Non ) if result and log_warning_message: proto_cfg = self._lib_cfg.lib_desc.version.write_options - resolved_recursive_normalizers = resolve_defaults( - "recursive_normalizers", - proto_cfg, - global_default=False, - uppercase=False, - **{"recursive_normalizers": recursive_normalizers}, + resolved_recursive_normalizers = self._is_recursive_normalizers_enabled( + **{"recursive_normalizers": recursive_normalizers} ) warning_msg = "" is_recursive_normalize_preferred, _, _ = self._try_flatten(item, "") diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index b3d5e8252df..39f6475bfd6 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -944,6 +944,7 @@ def write( staged=False, validate_index=True, index_column: Optional[str] = None, + recursive_normalizers: bool = None, ) -> VersionedItem: """ Write ``data`` to the specified ``symbol``. If ``symbol`` already exists then a new version will be created to @@ -992,6 +993,10 @@ def write( index_column: Optional[str], default=None Optional specification of timeseries index column if data is an Arrow table. Ignored if data is not an Arrow table. + recursive_normalizers: bool, default None + Whether to recursively normalize nested data structures when writing sequence-like or dict-like data. + If None, falls back to the corresponding setting in the library configuration. + The data structure can be nested or a mix of lists and dictionaries. Returns ------- @@ -1020,7 +1025,9 @@ def write( >>> w = adb.WritePayload("symbol", df, metadata={'the': 'metadata'}) >>> lib.write(*w, staged=True) """ - if not self._allowed_input_type(data): + if not self._nvs._is_recursive_normalizers_enabled( + **{"recursive_normalizers": recursive_normalizers} + ) and not self._allowed_input_type(data): raise ArcticUnsupportedDataTypeException( "data is of a type that cannot be normalized. Consider using " f"write_pickle instead. type(data)=[{type(data)}]" @@ -1037,10 +1044,17 @@ def write( index_column=index_column, norm_failure_options_msg="Using write_pickle will allow the object to be written. However, many operations " "(such as date_range filtering and column selection) will not work on pickled data.", + recursive_normalizers=recursive_normalizers, ) def write_pickle( - self, symbol: str, data: Any, metadata: Any = None, prune_previous_versions: bool = False, staged=False + self, + symbol: str, + data: Any, + metadata: Any = None, + prune_previous_versions: bool = False, + staged=False, + recursive_normalizers: bool = None, ) -> VersionedItem: """ See `write`. This method differs from `write` only in that ``data`` can be of any type that is serialisable via @@ -1062,6 +1076,10 @@ def write_pickle( See documentation on `write`. staged See documentation on `write`. + recursive_normalizers: bool, default None + See documentation on `write`. + If the leaf nodes cannot be natively normalized, they will be pickled, + resulting in the overall data being recursively normalized and partially pickled. Returns ------- @@ -1086,6 +1104,7 @@ def write_pickle( prune_previous_version=prune_previous_versions, pickle_on_failure=True, parallel=staged, + recursive_normalizers=recursive_normalizers, ) @staticmethod diff --git a/python/tests/conftest.py b/python/tests/conftest.py index d37d085761c..7085554fa73 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -655,12 +655,25 @@ def arctic_client_lmdb(request, encoding_version) -> Arctic: return ac +@pytest.fixture +def arctic_client_s3(request) -> Arctic: + storage_fixture: StorageFixture = request.getfixturevalue("s3_storage") + ac = storage_fixture.create_arctic() + return ac + + @pytest.fixture def arctic_library(arctic_client, lib_name) -> Generator[Library, None, None]: yield arctic_client.create_library(lib_name) arctic_client.delete_library(lib_name) +@pytest.fixture +def arctic_library_s3(arctic_client_s3, lib_name) -> Generator[Library, None, None]: + yield arctic_client_s3.create_library(lib_name) + arctic_client_s3.delete_library(lib_name) + + @pytest.fixture def arctic_library_dynamic(arctic_client, lib_name) -> Generator[Library, None, None]: lib_opts = LibraryOptions(dynamic_schema=True) diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index d06dc3978a2..f70130377fb 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -824,6 +824,22 @@ def test_s3_repr(s3_storage: S3Bucket, one_col_df, lib_name): assert written_vi.host == config +def test_default_library_config(mem_storage, lib_name): + ac = mem_storage.create_arctic() + lib = ac.create_library(lib_name) + assert lib._nvs.lib_cfg().lib_desc.version.symbol_list == True + + write_options = lib._nvs.lib_cfg().lib_desc.version.write_options + assert write_options.dynamic_strings == True + assert write_options.recursive_normalizers == True + assert write_options.use_tombstones == True + assert write_options.fast_tombstone_all == True + assert write_options.prune_previous_version == False + assert write_options.pickle_on_failure == False + assert write_options.snapshot_dedup == False + assert write_options.delayed_deletes == False + + class A: """A dummy user defined type that requires pickling to serialize.""" diff --git a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py index 93ef7164057..7a5f724eedc 100644 --- a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py +++ b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py @@ -20,6 +20,8 @@ UnsupportedKeyInDictionary, ArcticException as ArcticNativeException, ) +from arcticdb.version_store.library import ArcticUnsupportedDataTypeException +import arcticdb.toolbox.query_stats as qs from arcticdb_ext.storage import KeyType from arcticdb_ext.version_store import NoSuchVersionException import arcticdb_ext.stream as adb_stream @@ -57,6 +59,40 @@ def assert_vit_equals_except_data(left, right): assert left.timestamp == right.timestamp +@pytest.mark.parametrize("recursive_normalizers", (True, False, None)) +def test_v2_api(arctic_library_s3, sym, recursive_normalizers, clear_query_stats): + lib = arctic_library_s3 + data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})} + if recursive_normalizers is None or recursive_normalizers is True: + with qs.query_stats(): + lib.write(sym, data, recursive_normalizers=recursive_normalizers) + stats = qs.get_query_stats() + assert "MULTI_KEY" in stats["storage_operations"]["S3_PutObject"].keys() + else: + with pytest.raises(ArcticUnsupportedDataTypeException) as e: + lib.write(sym, data, recursive_normalizers=recursive_normalizers) + + +@pytest.mark.parametrize("recursive_normalizers", (True, False, None)) +def test_v2_api_pickle(arctic_library_s3, sym, recursive_normalizers, clear_query_stats): + lib = arctic_library_s3 + data = ( + { + "a": [1, 2, 3], + "b": {"c": np.arange(24)}, + "d": [AlmostAListNormalizer()], # A random item that will be pickled + }, + ) + with qs.query_stats(): + lib.write_pickle(sym, data, recursive_normalizers=recursive_normalizers) + keys = qs.get_query_stats()["storage_operations"]["S3_PutObject"].keys() + if recursive_normalizers is None or recursive_normalizers is True: + assert "MULTI_KEY" in keys + else: + assert "MULTI_KEY" not in keys + assert lib._nvs.is_symbol_pickled(sym) == True + + @pytest.mark.parametrize("read", (lambda lib, sym: lib.batch_read([sym])[sym], lambda lib, sym: lib.read(sym))) @pytest.mark.storage def test_recursively_written_data(basic_store, read): From c5ecb03b55b17a4e4706a73c9b1d10320c6f84d6 Mon Sep 17 00:00:00 2001 From: phoebusm Date: Fri, 7 Nov 2025 16:44:20 +0000 Subject: [PATCH 2/6] Make recursive normalizer default to be false in library option --- python/arcticdb/options.py | 6 ++++-- python/arcticdb/version_store/library.py | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/arcticdb/options.py b/python/arcticdb/options.py index 0e1e668d8e1..c8d646588b6 100644 --- a/python/arcticdb/options.py +++ b/python/arcticdb/options.py @@ -33,6 +33,8 @@ class LibraryOptions: See `__init__` for details. columns_per_segment: int See `__init__` for details. + recursive_normalizers: bool + See `__init__` for details. """ def __init__( @@ -43,7 +45,7 @@ def __init__( rows_per_segment: int = 100_000, columns_per_segment: int = 127, encoding_version: Optional[EncodingVersion] = None, - recursive_normalizers: bool = True, + recursive_normalizers: bool = False, ): """ Parameters @@ -126,7 +128,7 @@ def __init__( The encoding version to use when writing data to storage. v2 is faster, but still experimental, so use with caution. - recursive_normalizers: bool, default True + recursive_normalizers: bool, default False Whether to recursively normalize nested data structures when writing sequence-like or dict-like data. The data structure can be nested or a mix of lists and dictionaries. Note: If the leaf nodes cannot be natively normalized and must be written using write_pickle, those leaf nodes diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 39f6475bfd6..745ea612f4b 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -995,7 +995,10 @@ def write( table. recursive_normalizers: bool, default None Whether to recursively normalize nested data structures when writing sequence-like or dict-like data. - If None, falls back to the corresponding setting in the library configuration. + If None, falls back to the corresponding setting in the library configuration. For libraries created with < v6.4.0, + the default library configuration is True, otherwise it is False. + The library configuration can be modified via Arctic.modify_library_option(). Please refer to + https://docs.arcticdb.io/latest/api/arctic/#arcticdb.Arctic.modify_library_option for more details. The data structure can be nested or a mix of lists and dictionaries. Returns From 0399be61b7bd4eb3bd8e5ee29f050c6770b61e97 Mon Sep 17 00:00:00 2001 From: phoebusm Date: Wed, 12 Nov 2025 15:34:33 +0000 Subject: [PATCH 3/6] Fix tests --- build_tooling/format.py | 4 ++-- python/tests/integration/arcticdb/test_arctic.py | 2 +- .../version_store/test_recursive_normalizers.py | 12 ++++++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/build_tooling/format.py b/build_tooling/format.py index eb417248cb1..c281e54fc10 100644 --- a/build_tooling/format.py +++ b/build_tooling/format.py @@ -31,7 +31,7 @@ def install_tools(): return black or clang -def lint_python(in_place: bool, specific_file: str): +def lint_python(in_place: bool, specific_file: str = None): try: import black assert black.__version__ == black_version @@ -49,7 +49,7 @@ def lint_python(in_place: bool, specific_file: str): return subprocess.run(["black", "-l", "120", "--check", path]).returncode -def lint_cpp(in_place: bool, specific_file: str): +def lint_cpp(in_place: bool, specific_file: str = None): try: import clang_format except ImportError: diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index f70130377fb..ac9f233b8c3 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -831,7 +831,7 @@ def test_default_library_config(mem_storage, lib_name): write_options = lib._nvs.lib_cfg().lib_desc.version.write_options assert write_options.dynamic_strings == True - assert write_options.recursive_normalizers == True + assert write_options.recursive_normalizers == False assert write_options.use_tombstones == True assert write_options.fast_tombstone_all == True assert write_options.prune_previous_version == False diff --git a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py index 7a5f724eedc..a9d612f0592 100644 --- a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py +++ b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py @@ -7,7 +7,7 @@ import pytest import numpy as np import arcticdb -from arcticdb import QueryBuilder +from arcticdb import QueryBuilder, LibraryOptions from arcticdb.util.test import equals from arcticdb.flattener import Flattener from arcticdb.version_store._custom_normalizers import CustomNormalizer, register_normalizer @@ -59,11 +59,15 @@ def assert_vit_equals_except_data(left, right): assert left.timestamp == right.timestamp +@pytest.mark.parametrize("lib_option", (True, False, None)) @pytest.mark.parametrize("recursive_normalizers", (True, False, None)) -def test_v2_api(arctic_library_s3, sym, recursive_normalizers, clear_query_stats): - lib = arctic_library_s3 +def test_v2_api(arctic_client_s3, sym, recursive_normalizers, clear_query_stats, lib_name, lib_option): + if lib_option is None: + lib = arctic_client_s3.create_library(lib_name) + else: + lib = arctic_client_s3.create_library(lib_name, LibraryOptions(recursive_normalizers=lib_option)) data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})} - if recursive_normalizers is None or recursive_normalizers is True: + if (lib_option is True and recursive_normalizers is not False) or recursive_normalizers is True: with qs.query_stats(): lib.write(sym, data, recursive_normalizers=recursive_normalizers) stats = qs.get_query_stats() From 42eb0a2fc0469a5e94cb753a5f78200b3823bd2d Mon Sep 17 00:00:00 2001 From: Phoebus Mak Date: Wed, 12 Nov 2025 19:17:47 +0000 Subject: [PATCH 4/6] Fix test --- .../unit/arcticdb/version_store/test_recursive_normalizers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py index a9d612f0592..3988078e23b 100644 --- a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py +++ b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py @@ -90,7 +90,7 @@ def test_v2_api_pickle(arctic_library_s3, sym, recursive_normalizers, clear_quer with qs.query_stats(): lib.write_pickle(sym, data, recursive_normalizers=recursive_normalizers) keys = qs.get_query_stats()["storage_operations"]["S3_PutObject"].keys() - if recursive_normalizers is None or recursive_normalizers is True: + if recursive_normalizers is True: assert "MULTI_KEY" in keys else: assert "MULTI_KEY" not in keys From 8d2c35085bb67a617f9c1f32d25381ec042e4d93 Mon Sep 17 00:00:00 2001 From: Phoebus Mak Date: Mon, 17 Nov 2025 19:24:14 +0000 Subject: [PATCH 5/6] Address PR comments --- build_tooling/format.py | 2 - cpp/arcticdb/storage/library_manager.cpp | 3 + cpp/arcticdb/storage/library_manager.hpp | 9 ++- cpp/arcticdb/storage/python_bindings.cpp | 3 +- python/arcticdb/options.py | 6 ++ .../arcticdb/version_store/_normalization.py | 27 +++++--- python/arcticdb/version_store/_store.py | 33 +++++++++- python/arcticdb/version_store/library.py | 35 ++++++++-- python/tests/conftest.py | 20 +++--- .../test_arctic_library_management.py | 15 +++++ .../test_recursive_normalizers.py | 66 ++++++++++++------- 11 files changed, 163 insertions(+), 56 deletions(-) diff --git a/build_tooling/format.py b/build_tooling/format.py index c281e54fc10..f0cc97542ad 100644 --- a/build_tooling/format.py +++ b/build_tooling/format.py @@ -131,8 +131,6 @@ def main(type: str, in_place: bool, specific_file: str): raise RuntimeError("Must specify --type") if args.type not in ("python", "cpp", "all"): raise RuntimeError("Invalid --type") - if args.type == "all" and args.file: - raise RuntimeError("Cannot specify file type when specifying a single file for formatting") return_code = main( type=args.type, diff --git a/cpp/arcticdb/storage/library_manager.cpp b/cpp/arcticdb/storage/library_manager.cpp index 0732c26f93f..a37fce77d00 100644 --- a/cpp/arcticdb/storage/library_manager.cpp +++ b/cpp/arcticdb/storage/library_manager.cpp @@ -155,6 +155,9 @@ void LibraryManager::modify_library_option( case ModifiableLibraryOption::COLUMNS_PER_SEGMENT: mutable_write_options->set_column_group_size(get_positive_int(new_value)); break; + case ModifiableLibraryOption::RECURSIVE_NORMALIZERS: + mutable_write_options->set_recursive_normalizers(get_bool(new_value)); + break; default: throw UnsupportedLibraryOptionValue(fmt::format("Invalid library option: {}", option)); } diff --git a/cpp/arcticdb/storage/library_manager.hpp b/cpp/arcticdb/storage/library_manager.hpp index 1dd25b7d197..efaaa3fb246 100644 --- a/cpp/arcticdb/storage/library_manager.hpp +++ b/cpp/arcticdb/storage/library_manager.hpp @@ -16,7 +16,12 @@ #include namespace arcticdb::storage { -enum class ModifiableLibraryOption { DEDUP = 1, ROWS_PER_SEGMENT = 2, COLUMNS_PER_SEGMENT = 3 }; +enum class ModifiableLibraryOption { + DEDUP = 1, + ROWS_PER_SEGMENT = 2, + COLUMNS_PER_SEGMENT = 3, + RECURSIVE_NORMALIZERS = 4 +}; enum class ModifiableEnterpriseLibraryOption { REPLICATION = 1, BACKGROUND_DELETION = 2 }; using LibraryOptionValue = std::variant; @@ -101,6 +106,8 @@ struct formatter { return fmt::format_to(ctx.out(), "ROWS_PER_SEGMENT"); case arcticdb::storage::ModifiableLibraryOption::COLUMNS_PER_SEGMENT: return fmt::format_to(ctx.out(), "COLUMNS_PER_SEGMENT"); + case arcticdb::storage::ModifiableLibraryOption::RECURSIVE_NORMALIZERS: + return fmt::format_to(ctx.out(), "RECURSIVE_NORMALIZERS"); default: arcticdb::util::raise_rte("Unrecognized modifiable option {}", int(o)); } diff --git a/cpp/arcticdb/storage/python_bindings.cpp b/cpp/arcticdb/storage/python_bindings.cpp index e92ffb46a99..ca242352c54 100644 --- a/cpp/arcticdb/storage/python_bindings.cpp +++ b/cpp/arcticdb/storage/python_bindings.cpp @@ -148,7 +148,8 @@ void register_bindings(py::module& storage, py::exception(storage, "ModifiableEnterpriseLibraryOption", R"pbdoc( Enterprise library options that can be modified after library creation. diff --git a/python/arcticdb/options.py b/python/arcticdb/options.py index c8d646588b6..9024751348b 100644 --- a/python/arcticdb/options.py +++ b/python/arcticdb/options.py @@ -133,6 +133,12 @@ def __init__( The data structure can be nested or a mix of lists and dictionaries. Note: If the leaf nodes cannot be natively normalized and must be written using write_pickle, those leaf nodes will be pickled, resulting in the overall data being only partially normalized and partially pickled. + Example: + data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})} + lib = ac.create_library(lib_name) + lib.write(symbol, data) # ArcticUnsupportedDataTypeException will be thrown by default + lib2 = ac.create_library(lib_name, LibraryOptions(recursive_normalizers=True)) + lib2.write(symbol, data) # data will be successfully written """ self.dynamic_schema = dynamic_schema self.dedup = dedup diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index be7f99e63c3..3f51b994d69 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -14,6 +14,7 @@ if sys.version_info >= (3, 9): import zoneinfo from datetime import timedelta +from functools import partial import math import json @@ -1275,7 +1276,8 @@ def __init__(self, cfg=None): self.strict_mode = cfg.strict_mode if cfg is not None else False def normalize(self, obj, **kwargs): - packed, nbytes = self._msgpack_padded_packb(obj) + disallow_pickle = kwargs.get("disallow_pickle", None) + packed, nbytes = self._msgpack_padded_packb(obj, disallow_pickle=disallow_pickle) norm_meta = NormalizationMetadata() norm_meta.msg_pack_frame.version = 1 @@ -1309,7 +1311,7 @@ def denormalize(self, obj, meta): col_data = np_arr.view(np.uint8)[:sb] return self._msgpack_unpackb(memoryview(col_data)) - def _custom_pack(self, obj): + def _custom_pack(self, obj, disallow_pickle=None): if isinstance(obj, pd.Timestamp): tz = _ensure_str_timezone(get_timezone(obj.tz)) if obj.tz is not None else None return ExtType(MsgPackSerialization.PD_TIMESTAMP, packb([obj.value, tz])) @@ -1320,8 +1322,8 @@ def _custom_pack(self, obj): if isinstance(obj, datetime.timedelta): return ExtType(MsgPackSerialization.PY_TIMEDELTA, packb(pd.Timedelta(obj).value)) - if self.strict_mode: - raise TypeError("Normalisation is running in strict mode, writing pickled data is disabled.") + if disallow_pickle: + raise TypeError("Normalizing data by pickling has been disabled.") else: return ExtType(MsgPackSerialization.PY_PICKLE_3, packb(Pickler.write(obj))) @@ -1353,11 +1355,20 @@ def _ext_hook(self, code, data): return ExtType(code, data) - def _msgpack_packb(self, obj): - return packb(obj, default=self._custom_pack) + def _should_disallow_pickle(self, disallow_pickle): + # `disallow_pickle` set by function parameter, has priority + # Otherwise fallback to library option `strict_mode` + return self.strict_mode if disallow_pickle is None else disallow_pickle - def _msgpack_padded_packb(self, obj): - return padded_packb(obj, default=self._custom_pack) + def _msgpack_packb(self, obj, disallow_pickle=None): + return packb( + obj, default=partial(self._custom_pack, disallow_pickle=self._should_disallow_pickle(disallow_pickle)) + ) + + def _msgpack_padded_packb(self, obj, disallow_pickle=None): + return padded_packb( + obj, default=partial(self._custom_pack, disallow_pickle=self._should_disallow_pickle(disallow_pickle)) + ) def _msgpack_unpackb(self, buff, raw=False): return unpackb(buff, raw=raw, ext_hook=self._ext_hook) diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 9d05eacc8da..22411175df1 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -473,6 +473,7 @@ def _try_normalize( coerce_columns, norm_failure_options_msg="", index_column=None, + recursive_normalize_msgpack_no_pickle_fallback=None, **kwargs, ): dynamic_schema = resolve_defaults( @@ -496,6 +497,7 @@ def _try_normalize( empty_types=empty_types, index_column=index_column, allow_arrow_input=self._allow_arrow_input, + disallow_pickle=recursive_normalize_msgpack_no_pickle_fallback, **kwargs, ) except ArcticDbNotYetImplemented as ex: @@ -525,14 +527,29 @@ def _try_flatten(self, data, symbol): return FlattenResult(False, None, None) def _try_flatten_and_write_composite_object( - self, symbol, data, metadata, pickle_on_failure, dynamic_strings, prune_previous + self, + symbol, + data, + metadata, + pickle_on_failure, + dynamic_strings, + prune_previous, + recursive_normalize_msgpack_no_pickle_fallback, ): is_recursive_normalize_preferred, metastruct, to_write = self._try_flatten(data, symbol) if is_recursive_normalize_preferred: items = [] norm_metas = [] for k, v in to_write.items(): - _, item, norm_meta = self._try_normalize(k, v, None, pickle_on_failure, dynamic_strings, None) + _, item, norm_meta = self._try_normalize( + k, + v, + None, + pickle_on_failure, + dynamic_strings, + None, + recursive_normalize_msgpack_no_pickle_fallback=recursive_normalize_msgpack_no_pickle_fallback, + ) items.append(item) norm_metas.append(norm_meta) normalized_udm = normalize_metadata(metadata) @@ -710,6 +727,9 @@ def write( parallel = resolve_defaults("parallel", proto_cfg, global_default=False, uppercase=False, **kwargs) incomplete = resolve_defaults("incomplete", proto_cfg, global_default=False, uppercase=False, **kwargs) recursive_normalizers = self._is_recursive_normalizers_enabled(**kwargs) + recursive_normalize_msgpack_no_pickle_fallback = kwargs.get( + "recursive_normalize_msgpack_no_pickle_fallback", None + ) # TODO remove me when dynamic strings is the default everywhere if parallel: @@ -731,7 +751,13 @@ def write( # Do a multi_key write if the structured is nested and is not trivially normalizable via msgpack. if recursive_normalizers: vit = self._try_flatten_and_write_composite_object( - symbol, data, metadata, pickle_on_failure, dynamic_strings, prune_previous_version + symbol, + data, + metadata, + pickle_on_failure, + dynamic_strings, + prune_previous_version, + recursive_normalize_msgpack_no_pickle_fallback, ) if isinstance(vit, VersionedItem): return vit @@ -745,6 +771,7 @@ def write( coerce_columns, norm_failure_options_msg, index_column, + recursive_normalize_msgpack_no_pickle_fallback, ) if self._valid_item_type(item): if parallel or incomplete: diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 745ea612f4b..a575ceb5216 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -1000,6 +1000,12 @@ def write( The library configuration can be modified via Arctic.modify_library_option(). Please refer to https://docs.arcticdb.io/latest/api/arctic/#arcticdb.Arctic.modify_library_option for more details. The data structure can be nested or a mix of lists and dictionaries. + Example: + data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})} + lib.write(symbol, data, recursive_normalizers=False) # ArcticUnsupportedDataTypeException will be thrown + lib.write(symbol, data, recursive_normalizers=True) # The data will be successfully written + ac.modify_library_option(lib, ModifiableLibraryOption.RECURSIVE_NORMALIZERS, True) + lib.write(symbol, data) # The data will be successfully written Returns ------- @@ -1028,13 +1034,20 @@ def write( >>> w = adb.WritePayload("symbol", df, metadata={'the': 'metadata'}) >>> lib.write(*w, staged=True) """ - if not self._nvs._is_recursive_normalizers_enabled( + is_recursive_normalizers_enabled = self._nvs._is_recursive_normalizers_enabled( **{"recursive_normalizers": recursive_normalizers} - ) and not self._allowed_input_type(data): - raise ArcticUnsupportedDataTypeException( - "data is of a type that cannot be normalized. Consider using " - f"write_pickle instead. type(data)=[{type(data)}]" - ) + ) + if not self._allowed_input_type(data): + if is_recursive_normalizers_enabled: + if staged: + raise ArcticUnsupportedDataTypeException( + "Staged data must be of a type that can be natively normalized" + ) + else: + raise ArcticUnsupportedDataTypeException( + "Data is of a type that cannot be normalized. Consider using " + f"write_pickle instead. type(data)=[{type(data)}]" + ) return self._nvs.write( symbol=symbol, @@ -1048,6 +1061,7 @@ def write( norm_failure_options_msg="Using write_pickle will allow the object to be written. However, many operations " "(such as date_range filtering and column selection) will not work on pickled data.", recursive_normalizers=recursive_normalizers, + recursive_normalize_msgpack_no_pickle_fallback=True, ) def write_pickle( @@ -1083,6 +1097,14 @@ def write_pickle( See documentation on `write`. If the leaf nodes cannot be natively normalized, they will be pickled, resulting in the overall data being recursively normalized and partially pickled. + Example: + data = {"a": np.arange(5), "b": ABC()} # ABC is some custom class that cannot be natively normalized + # Exception will be thrown, as the leaf node requires pickling to normalize + lib.write(symbol, data, recursive_normalizers=True) + # The data will be successfully written by partially pickling the leaf node + lib.write_pickle(symbol, data, recursive_normalizers=True) + # The data will be successfully written by pickling the whole object + lib.write_pickle(symbol, data) Returns ------- @@ -1108,6 +1130,7 @@ def write_pickle( pickle_on_failure=True, parallel=staged, recursive_normalizers=recursive_normalizers, + recursive_normalize_msgpack_no_pickle_fallback=False, ) @staticmethod diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 7085554fa73..05e97573250 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -655,10 +655,10 @@ def arctic_client_lmdb(request, encoding_version) -> Arctic: return ac -@pytest.fixture -def arctic_client_s3(request) -> Arctic: - storage_fixture: StorageFixture = request.getfixturevalue("s3_storage") - ac = storage_fixture.create_arctic() +@pytest.fixture(scope="function", params=["lmdb"]) +def arctic_client_lmdb_v1_only(request) -> Arctic: + storage_fixture: StorageFixture = request.getfixturevalue(request.param + "_storage") + ac = storage_fixture.create_arctic(encoding_version=EncodingVersion.V1) return ac @@ -668,12 +668,6 @@ def arctic_library(arctic_client, lib_name) -> Generator[Library, None, None]: arctic_client.delete_library(lib_name) -@pytest.fixture -def arctic_library_s3(arctic_client_s3, lib_name) -> Generator[Library, None, None]: - yield arctic_client_s3.create_library(lib_name) - arctic_client_s3.delete_library(lib_name) - - @pytest.fixture def arctic_library_dynamic(arctic_client, lib_name) -> Generator[Library, None, None]: lib_opts = LibraryOptions(dynamic_schema=True) @@ -693,6 +687,12 @@ def arctic_library_lmdb(arctic_client_lmdb, lib_name) -> Generator[Library, None arctic_client_lmdb.delete_library(lib_name) +@pytest.fixture +def arctic_library_lmdb_v1_only(arctic_client_lmdb_v1_only, lib_name) -> Generator[Library, None, None]: + yield arctic_client_lmdb_v1_only.create_library(lib_name) + arctic_client_lmdb_v1_only.delete_library(lib_name) + + @pytest.fixture( scope="function", params=[ diff --git a/python/tests/integration/arcticdb/test_arctic_library_management.py b/python/tests/integration/arcticdb/test_arctic_library_management.py index 28a214c6366..6f2873dd3ab 100644 --- a/python/tests/integration/arcticdb/test_arctic_library_management.py +++ b/python/tests/integration/arcticdb/test_arctic_library_management.py @@ -248,6 +248,21 @@ def test_modify_options_cols_per_segment(lmdb_storage, lib_name): assert proto_options.column_group_size == 200 +def test_modify_options_recursive_normalizers(lmdb_storage, lib_name): + ac = lmdb_storage.create_arctic() + lib = ac.create_library(lib_name) + + ac.modify_library_option(lib, ModifiableLibraryOption.RECURSIVE_NORMALIZERS, True) + + proto_options = lib._nvs.lib_cfg().lib_desc.version.write_options + assert proto_options.recursive_normalizers + + ac.modify_library_option(lib, ModifiableLibraryOption.RECURSIVE_NORMALIZERS, False) + + proto_options = lib._nvs.lib_cfg().lib_desc.version.write_options + assert not proto_options.recursive_normalizers + + def test_modify_options_replication(lmdb_storage, lib_name): ac = lmdb_storage.create_arctic() lib = ac.create_library(lib_name) diff --git a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py index 3988078e23b..856bb6dd2a5 100644 --- a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py +++ b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py @@ -21,8 +21,7 @@ ArcticException as ArcticNativeException, ) from arcticdb.version_store.library import ArcticUnsupportedDataTypeException -import arcticdb.toolbox.query_stats as qs -from arcticdb_ext.storage import KeyType +from arcticdb_ext.storage import KeyType, ModifiableLibraryOption from arcticdb_ext.version_store import NoSuchVersionException import arcticdb_ext.stream as adb_stream import arcticdb_ext @@ -59,44 +58,61 @@ def assert_vit_equals_except_data(left, right): assert left.timestamp == right.timestamp +@pytest.mark.parametrize("staged", (True, False, None)) @pytest.mark.parametrize("lib_option", (True, False, None)) @pytest.mark.parametrize("recursive_normalizers", (True, False, None)) -def test_v2_api(arctic_client_s3, sym, recursive_normalizers, clear_query_stats, lib_name, lib_option): +def test_v2_api( + arctic_client_lmdb_v1_only, sym, recursive_normalizers, clear_query_stats, lib_name, lib_option, staged +): if lib_option is None: - lib = arctic_client_s3.create_library(lib_name) + lib = arctic_client_lmdb_v1_only.create_library(lib_name) else: - lib = arctic_client_s3.create_library(lib_name, LibraryOptions(recursive_normalizers=lib_option)) + lib = arctic_client_lmdb_v1_only.create_library(lib_name, LibraryOptions(recursive_normalizers=lib_option)) + lt = lib._nvs.library_tool() data = {"a": np.arange(5), "b": pd.DataFrame({"col": [1, 2, 3]})} - if (lib_option is True and recursive_normalizers is not False) or recursive_normalizers is True: - with qs.query_stats(): - lib.write(sym, data, recursive_normalizers=recursive_normalizers) - stats = qs.get_query_stats() - assert "MULTI_KEY" in stats["storage_operations"]["S3_PutObject"].keys() + if staged is not True and ( + (lib_option is True and recursive_normalizers is not False) or recursive_normalizers is True + ): + lib.write(sym, data, recursive_normalizers=recursive_normalizers, staged=staged) + assert len(lt.find_keys(KeyType.MULTI_KEY)) > 0 else: with pytest.raises(ArcticUnsupportedDataTypeException) as e: - lib.write(sym, data, recursive_normalizers=recursive_normalizers) + lib.write(sym, data, recursive_normalizers=recursive_normalizers, staged=staged) + + if lib_option is not True: + arctic_client_lmdb_v1_only.modify_library_option(lib, ModifiableLibraryOption.RECURSIVE_NORMALIZERS, True) + lib.write(sym, data) + + +partial_pickle_required_data = { + "a": [1, 2, 3], + "b": {"c": np.arange(24)}, + "d": [AlmostAListNormalizer()], # A random item that will be pickled +} @pytest.mark.parametrize("recursive_normalizers", (True, False, None)) -def test_v2_api_pickle(arctic_library_s3, sym, recursive_normalizers, clear_query_stats): - lib = arctic_library_s3 - data = ( - { - "a": [1, 2, 3], - "b": {"c": np.arange(24)}, - "d": [AlmostAListNormalizer()], # A random item that will be pickled - }, - ) - with qs.query_stats(): - lib.write_pickle(sym, data, recursive_normalizers=recursive_normalizers) - keys = qs.get_query_stats()["storage_operations"]["S3_PutObject"].keys() +def test_v2_api_pickle(arctic_library_lmdb_v1_only, sym, recursive_normalizers): + lib = arctic_library_lmdb_v1_only + lt = lib._nvs.library_tool() + lib.write_pickle(sym, partial_pickle_required_data, recursive_normalizers=recursive_normalizers) + keys = lt.find_keys(KeyType.MULTI_KEY) if recursive_normalizers is True: - assert "MULTI_KEY" in keys + assert len(keys) > 0 else: - assert "MULTI_KEY" not in keys + assert len(keys) == 0 assert lib._nvs.is_symbol_pickled(sym) == True +@pytest.mark.parametrize("recursive_normalizers", (True, False, None)) +def test_v2_api_write_partial_pickle(arctic_library_lmdb_v1_only, sym, recursive_normalizers): + lib = arctic_library_lmdb_v1_only + # Pending v7 release to have more exact exception + # https://man312219.monday.com/boards/7852509418/pulses/18038782559 + with pytest.raises(ArcticNativeException) as e: + lib.write(sym, partial_pickle_required_data, recursive_normalizers=recursive_normalizers) + + @pytest.mark.parametrize("read", (lambda lib, sym: lib.batch_read([sym])[sym], lambda lib, sym: lib.read(sym))) @pytest.mark.storage def test_recursively_written_data(basic_store, read): From 299cc54258fe3b2b5014f1910df938a49bca222e Mon Sep 17 00:00:00 2001 From: Phoebus Mak Date: Fri, 21 Nov 2025 13:18:02 +0000 Subject: [PATCH 6/6] Notebook for recursive normalizer --- .../ArcticDB_demo_recursive_normalizers.ipynb | 586 ++++++++++++++++++ docs/mkdocs/mkdocs.yml | 1 + 2 files changed, 587 insertions(+) create mode 100644 docs/mkdocs/docs/notebooks/ArcticDB_demo_recursive_normalizers.ipynb diff --git a/docs/mkdocs/docs/notebooks/ArcticDB_demo_recursive_normalizers.ipynb b/docs/mkdocs/docs/notebooks/ArcticDB_demo_recursive_normalizers.ipynb new file mode 100644 index 00000000000..a82d5dbd2e4 --- /dev/null +++ b/docs/mkdocs/docs/notebooks/ArcticDB_demo_recursive_normalizers.ipynb @@ -0,0 +1,586 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# ArcticDB Recursive Normalizers Demo\n", + "\n", + "This notebook demonstrates the **recursive normalizers** feature in ArcticDB, which allows you to write nested data structures without having to pickle the entire object.\n", + "\n", + "## What are Recursive Normalizers?\n", + "\n", + "Recursive normalizers enable ArcticDB to:\n", + "- Write and read nested data structures (dicts, lists, tuples) containing DataFrames and arrays\n", + "- Store each component efficiently without pickling the entire structure" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import pandas as pd\n", + "import arcticdb as adb\n", + "from arcticdb.util.test import equals" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Create Arctic instance with LMDB backend\n", + "arctic = adb.Arctic(\"lmdb://recursive_normalizers_demo\")\n", + "\n", + "lib = arctic.get_library(\"demo\", create_if_missing=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Basic Example: Writing Dict Data\n", + "\n", + "Let's start with a simple example of writing a dictionary containing DataFrames and arrays." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Original nested data:\n", + "{'dataframe1': col1 col2\n", + "0 1 a\n", + "1 2 b\n", + "2 3 c, 'dataframe2': value\n", + "0 10\n", + "1 20\n", + "2 30, 'array': array([0, 1, 2, 3, 4]), 'metadata': {'description': 'Sample nested data'}}\n" + ] + } + ], + "source": [ + "# Create sample data\n", + "df1 = pd.DataFrame({\"col1\": [1, 2, 3], \"col2\": [\"a\", \"b\", \"c\"]})\n", + "df2 = pd.DataFrame({\"value\": [10, 20, 30]})\n", + "array = np.arange(5)\n", + "\n", + "# Create nested structure\n", + "nested_data = {\n", + " \"dataframe1\": df1,\n", + " \"dataframe2\": df2,\n", + " \"array\": array,\n", + " \"metadata\": {\"description\": \"Sample nested data\"}\n", + "}\n", + "\n", + "print(\"Original nested data:\")\n", + "print(nested_data)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Writing Without Recursive Normalizers (Will Fail)\n", + "\n", + "By default, ArcticDB cannot write nested structures without pickling:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Exception thrown: Data is of a type that cannot be normalized. Consider using write_pickle instead. type(data)=[]\n" + ] + } + ], + "source": [ + "# This will raise an exception, recursive_normalizers is False by default\n", + "try:\n", + " lib.write(\"nested_data_fail\", nested_data)\n", + "except Exception as e:\n", + " print(f\"Exception thrown: {e}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Writing With Recursive Normalizers (Success)\n", + "\n", + "Enable recursive normalizers to write the nested structure:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Successfully written!\n", + "\n", + "Assertion passed: Read data matches original nested_data!\n" + ] + } + ], + "source": [ + "# Write with recursive normalizers enabled\n", + "lib.write(\"nested_data_success\", nested_data, recursive_normalizers=True)\n", + "print(\"Successfully written!\")\n", + "\n", + "# Read it back\n", + "result = lib.read(\"nested_data_success\").data\n", + "\n", + "# Verify the data matches the original\n", + "equals(nested_data, result)\n", + "print(\"\\nAssertion passed: Read data matches original nested_data!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example: Lists and Tuples\n", + "\n", + "Recursive normalizers also work with lists and tuples:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "List data successfully written and verified!\n" + ] + } + ], + "source": [ + "# Create list of DataFrames\n", + "list_data = [\n", + " pd.DataFrame({\"a\": [1, 2, 3]}),\n", + " pd.DataFrame({\"b\": [4, 5, 6]}),\n", + " np.array([7, 8, 9])\n", + "]\n", + "\n", + "lib.write(\"list_data\", list_data, recursive_normalizers=True)\n", + "result = lib.read(\"list_data\").data\n", + "\n", + "# Verify the data matches\n", + "equals(list_data, result)\n", + "print(\"List data successfully written and verified!\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Tuple data successfully written and verified!\n" + ] + } + ], + "source": [ + "# Create tuple of mixed data\n", + "tuple_data = (\n", + " np.arange(5),\n", + " pd.DataFrame({\"col\": [1, 2, 3]}),\n", + " {\"nested\": np.arange(3)}\n", + ")\n", + "\n", + "lib.write(\"tuple_data\", tuple_data, recursive_normalizers=True)\n", + "result = lib.read(\"tuple_data\").data\n", + "\n", + "# Verify the data matches\n", + "equals(tuple_data, result)\n", + "print(\"Tuple data successfully written and verified!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example: Nested Structures\n", + "\n", + "Recursive normalizers can handle nested structures:" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Nested structure successfully written and verified!\n" + ] + } + ], + "source": [ + "# Create nested structure\n", + "nested = {\n", + " \"level1\": {\n", + " \"level2\": {\n", + " \"level3\": {\n", + " \"dataframe\": pd.DataFrame({\"x\": [1, 2, 3]}),\n", + " \"array\": np.arange(10)\n", + " }\n", + " }\n", + " },\n", + " \"another_branch\": [\n", + " pd.DataFrame({\"y\": [4, 5, 6]}),\n", + " {\"nested_dict\": np.array([7, 8, 9])}\n", + " ]\n", + "}\n", + "\n", + "lib.write(\"nested\", nested, recursive_normalizers=True)\n", + "result = lib.read(\"nested\").data\n", + "\n", + "# Verify the data matches\n", + "equals(nested, result)\n", + "print(\"Nested structure successfully written and verified!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enable Recursive Normalizer in Library Configuration\n", + "You can configure recursive normalizers at the library configuration instead of specifying it for each write:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Modifying Library Options\n", + "\n", + "You can modify an existing library's configuration:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2025-11-21 13:07:07,890 INFO [arcticdb.arctic] Set option=[ModifiableLibraryOption.RECURSIVE_NORMALIZERS] to value=[True] for Arctic=[Arctic(config=LMDB(path=/recursive_normalizers_demo))] Library=[Library(Arctic(config=LMDB(path=/recursive_normalizers_demo)), path=demo, storage=lmdb_storage)]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Data written successfully after modifying library option and verified!\n" + ] + } + ], + "source": [ + "# Enable recursive normalizers for existing library\n", + "from arcticdb_ext.storage import ModifiableLibraryOption\n", + "arctic.modify_library_option(lib, ModifiableLibraryOption.RECURSIVE_NORMALIZERS, True)\n", + "\n", + "# Now writes will enable recursive normalizers by default\n", + "data = {\"df\": pd.DataFrame({\"b\": [4, 5, 6]}), \"arr\": np.arange(3)}\n", + "lib.write(\"modified_lib_write\", data)\n", + "\n", + "result = lib.read(\"modified_lib_write\").data\n", + "equals(data, result)\n", + "print(\"Data written successfully after modifying library option and verified!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### New Library\n", + "You can also create a new library with recursive normalizers enabled by default" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Data written successfully with library option enabled and verified!\n" + ] + } + ], + "source": [ + "# Create a new library with recursive normalizers enabled by default\n", + "lib_recursive = arctic.create_library(\"demo_recursive\", library_options=adb.LibraryOptions(recursive_normalizers=True))\n", + "\n", + "# Now you can write without specifying recursive_normalizers=True\n", + "data = {\"df\": pd.DataFrame({\"a\": [1, 2, 3]}), \"arr\": np.arange(5)}\n", + "lib_recursive.write(\"auto_recursive\", data)\n", + "\n", + "result = lib_recursive.read(\"auto_recursive\").data\n", + "equals(data, result)\n", + "print(\"Data written successfully with library option enabled and verified!\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using write_pickle with Recursive Normalizers\n", + "\n", + "The `write_pickle` method can also use recursive normalizers. This is useful when you have nested structures with some components that cannot be natively normalized:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Error with write: Error while normalizing symbol=mixed_fail__custom, Normalizing data by pickling has been disabled.\n", + "Data written with write_pickle and verified! Data has been partially-pickled\n" + ] + } + ], + "source": [ + "# Custom class that cannot be natively normalized\n", + "class CustomClass:\n", + " def __init__(self, value):\n", + " self.value = value\n", + " \n", + " def __eq__(self, other):\n", + " return isinstance(other, CustomClass) and self.value == other.value\n", + "\n", + "# Data with custom class\n", + "mixed_data = {\n", + " \"dataframe\": pd.DataFrame({\"a\": [1, 2, 3]}),\n", + " \"array\": np.arange(5),\n", + " \"custom\": CustomClass(42)\n", + "}\n", + "\n", + "# This will fail with regular write\n", + "try:\n", + " lib.write(\"mixed_fail\", mixed_data, recursive_normalizers=True)\n", + "except Exception as e:\n", + " print(f\"Error with write: {e}\")\n", + "\n", + "# But works with write_pickle\n", + "lib.write_pickle(\"mixed_success\", mixed_data, recursive_normalizers=True)\n", + "result = lib.read(\"mixed_success\").data\n", + "\n", + "# Verify the data matches\n", + "equals(mixed_data, result)\n", + "print(\"Data written with write_pickle and verified! Data has been partially-pickled\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Exception Handling Examples\n", + "\n", + "Let's explore various exceptions that can occur when using recursive normalizers:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1. ArcticUnsupportedDataTypeException\n", + "\n", + "This exception occurs when trying to write nested data without enabling recursive normalizers:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Caught ArcticUnsupportedDataTypeException:\n", + "Message: Data is of a type that cannot be normalized. Consider using write_pickle instead. type(data)=[]\n", + "\n", + "Solution: Use recursive_normalizers=True or write_pickle\n" + ] + } + ], + "source": [ + "from arcticdb.version_store.library import ArcticUnsupportedDataTypeException\n", + "\n", + "data = {\"df\": pd.DataFrame({\"a\": [1, 2, 3]})}\n", + "\n", + "try:\n", + " lib.write(\"exception_test1\", data, recursive_normalizers=False)\n", + "except ArcticUnsupportedDataTypeException as e:\n", + " print(\"Caught ArcticUnsupportedDataTypeException:\")\n", + " print(f\"Message: {str(e)}\")\n", + " print(\"\\nSolution: Use recursive_normalizers=True or write_pickle\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2. DataTooNestedException\n", + "\n", + "There's a limit to how deeply nested structures can be (255 levels):" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Caught DataTooNestedException:\n", + "Message: Symbol exception_test3 cannot be recursively normalized as it contains more than 255 levels of nested dictionaries. This is a limitation of the msgpack serializer.\n", + "\n", + "Solution: Reduce nesting depth to 255 levels or less\n" + ] + } + ], + "source": [ + "from arcticdb.exceptions import DataTooNestedException\n", + "\n", + "# Create a structure that's too deeply nested (256 levels)\n", + "def create_deep_nest(depth):\n", + " if depth == 0:\n", + " return pd.DataFrame({\"a\": [1, 2, 3]})\n", + " return {\"nested\": create_deep_nest(depth - 1)}\n", + "\n", + "try:\n", + " too_deep = create_deep_nest(256)\n", + " lib.write(\"exception_test3\", too_deep, recursive_normalizers=True)\n", + "except DataTooNestedException as e:\n", + " print(\"Caught DataTooNestedException:\")\n", + " print(f\"Message: {str(e)}\")\n", + " print(\"\\nSolution: Reduce nesting depth to 255 levels or less\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 3. SchemaException\n", + "\n", + "Recursive normalized data cannot be filtered. Attempting to filter such data will raise a SchemaException." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Caught SchemaException\n", + "Message: E_OPERATION_NOT_SUPPORTED_WITH_RECURSIVE_NORMALIZED_DATA Cannot filter recursively normalized data\n", + "\n", + "Solution: Recursive normalized data cannot be filtered. Read the full data without filtering.\n", + "Caught SchemaException again\n" + ] + } + ], + "source": [ + "from arcticdb.exceptions import SchemaException\n", + "from arcticdb.version_store.processing import QueryBuilder\n", + "\n", + "# Try to filter recursively normalized data - this will fail\n", + "try:\n", + " q = QueryBuilder()\n", + " q = q[q[\"col\"] == 0]\n", + " lib.read(\"nested\", query_builder=q)\n", + "except SchemaException as e:\n", + " print(\"Caught SchemaException\")\n", + " print(f\"Message: {str(e)}\")\n", + " print(\"\\nSolution: Recursive normalized data cannot be filtered. Read the full data without filtering.\")\n", + "\n", + "try:\n", + " lib.head(\"nested\")\n", + "except SchemaException as e:\n", + " print(\"Caught SchemaException again\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Best Practices\n", + "\n", + "1. **Use recursive normalizers for nested structures**: When you have dictionaries or lists containing DataFrames and arrays\n", + "2. **Configure at library level**: If you frequently write nested data, enable it at the library level\n", + "4. **Limit nesting depth**: Keep nesting under 255 levels\n", + "5. **Use write_pickle for mixed data**: When you have custom objects that can't be normalized\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index 86c7e90fdaa..d4beadf42fc 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -131,6 +131,7 @@ nav: - 1 Billion Row Challenge Notebook: 'notebooks/ArcticDB_billion_row_challenge.ipynb' - Pythagorean Won Loss Formula Notebook: 'notebooks/ArcticDB_pythagorean_won_loss_formula_notebook.ipynb' - Staged Data Notebook: 'notebooks/ArcticDB_staged_data_with_tokens.ipynb' + - Recursive normalizer Notebook: 'notebooks/ArcticDB_demo_recursive_normalizers.ipynb - Python API Reference: - Introduction: 'api/index.md' - Arctic: 'api/arctic.md'