Skip to content

Commit 3e9a437

Browse files
Saeid BaratiSaeid Barati
authored andcommitted
Address review feedback: wire/storage formats + lazy registry
Wire vs storage format - Added WIRE and STORAGE constants in metaflow.datastore.artifacts.serializer. - ArtifactSerializer.serialize/deserialize now accept a ``format`` kwarg so a single class can own both the storage path (datastore blobs + metadata) and the wire path (string for CLI args, protobuf payloads, cross-process IPC). - PickleSerializer implements STORAGE only; WIRE raises NotImplementedError with an explanation (pickle bytes are not safe as a wire payload). - Serializers that want wire support implement it on the same class; there's no need for a second class per format. Lazy import-hook registry - New module metaflow/datastore/artifacts/lazy_registry.py with SerializerConfig, an importlib.abc.MetaPathFinder interceptor, and public register_serializer_for_type / load_serializer_class entry points. - If the target type's module is already in sys.modules, registration is immediate. Otherwise a hook is installed on sys.meta_path and registration fires the first time the user's code imports the target module. This defers the cost of serializer-module imports (torch, pyarrow, fastavro, ...) until those dependencies are actually in play. - find_spec temporarily removes the interceptor from sys.meta_path during its lookup to avoid recursion. Review nits - serializer.py: rename SerializationMetadata.type field to obj_type to avoid shadowing the type() builtin (dict key inside _info stays "type" for backward compatibility with existing datastores). - serializer.py: SerializerStore skips TYPE=None AND any subclass that is still abstract, via inspect.isabstract(). - serializer.py: get_ordered_serializers memoizes the sorted list and invalidates on new registration. Drops the O(n²) list.index tiebreaker — Python 3.7+ dicts preserve insertion order. - task_datastore.py: lift SerializerStore / SerializationMetadata imports to module top (were on hot paths: __init__ and load_artifacts). - task_datastore.py: the "no serializer claimed this artifact" branch now raises DataException with a message pointing at the PickleSerializer fallback invariant, instead of the misleading UnpicklableArtifactException. - task_datastore.py: "no deserializer claimed this artifact" now looks up ``serializer_info["source"]`` and hints at a missing extension. - SerializedBlob.compress_method: removed. It was documented as "not wired into the save path" — shipping an unwired knob invites extension authors to rely on it and discover it is a no-op. Will come back with its consuming code in a later change. - serialize() docstring rewritten: the side-effect-free contract is there to support retries, caching, and parallel dispatch, and to keep serializers testable; I/O belongs in hooks. Tests - New test/unit/test_lazy_serializer_registry.py (9 tests covering config validation, eager registration for already-imported types, deferred registration through the import hook, and the recursion guard). - test_artifact_serializer.py gains format-dispatch coverage (STORAGE and WIRE round-trip on a toy dual-format serializer; PickleSerializer raises on WIRE). - Removed tests for compress_method and adjusted tests that poked at _registration_order so they go through the public API only.
1 parent e68748f commit 3e9a437

9 files changed

Lines changed: 746 additions & 101 deletions

File tree

metaflow/datastore/artifacts/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,12 @@
33
SerializationMetadata,
44
SerializedBlob,
55
SerializerStore,
6+
STORAGE,
7+
WIRE,
8+
)
9+
from .lazy_registry import (
10+
SerializerConfig,
11+
load_serializer_class,
12+
register_serializer_config,
13+
register_serializer_for_type,
614
)
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
"""
2+
Lazy serializer registry driven by an import hook.
3+
4+
Extensions ship serializers whose implementation modules may import optional
5+
heavy dependencies (``torch``, ``pyarrow``, ``fastavro``, ``protobuf``, ...).
6+
Loading those serializer modules unconditionally at ``metaflow`` import time
7+
would force every user to pay for dependencies they may not have installed.
8+
9+
This module defers both the serializer class import and its instantiation
10+
until one of two things happens:
11+
12+
1. The target type's module is already present in :data:`sys.modules` when
13+
registration is called — registration then happens immediately.
14+
2. The target type's module is imported later by the user's code. An
15+
``importlib`` hook watches for that event and performs registration the
16+
first time the module is loaded.
17+
18+
The hook is installed on :data:`sys.meta_path` and removes itself from the
19+
path during its own ``find_spec`` call to avoid recursion.
20+
"""
21+
22+
import importlib
23+
import importlib.abc
24+
import importlib.machinery
25+
import importlib.util
26+
import sys
27+
28+
from dataclasses import dataclass, field
29+
30+
31+
@dataclass
32+
class SerializerConfig:
33+
"""
34+
Declarative entry recording *which* serializer handles *which* type,
35+
without actually importing the serializer class. The class is imported on
36+
first use by :func:`load_serializer_class`.
37+
38+
Parameters
39+
----------
40+
canonical_type : str
41+
``"module.ClassName"`` — e.g. ``"builtins.dict"``, ``"torch.Tensor"``.
42+
serializer : str
43+
Dotted import path to the serializer class, e.g.
44+
``"my_extension.serializers.TorchSerializer"``.
45+
priority : int
46+
Dispatch priority. Lower is tried first. Matches the existing
47+
``ArtifactSerializer.PRIORITY`` convention.
48+
extra_kwargs : dict
49+
Optional kwargs passed to the serializer class ``__init__``.
50+
"""
51+
52+
canonical_type: str
53+
serializer: str
54+
priority: int = 100
55+
extra_kwargs: dict = field(default_factory=dict)
56+
57+
def __post_init__(self):
58+
if not self.canonical_type:
59+
raise ValueError("canonical_type cannot be empty")
60+
if not self.serializer or "." not in self.serializer:
61+
raise ValueError("serializer must be in 'module.ClassName' format")
62+
63+
@property
64+
def serializer_module(self):
65+
return ".".join(self.serializer.split(".")[:-1])
66+
67+
@property
68+
def serializer_class(self):
69+
return self.serializer.split(".")[-1]
70+
71+
72+
# Module-level registry. Keyed by canonical_type -> SerializerConfig.
73+
# A separate dict caches instantiated classes so repeated lookups are O(1).
74+
_registered_configs = {}
75+
_loaded_serializers = {}
76+
77+
78+
def register_serializer_config(config):
79+
"""Store a config immediately. The serializer class is not imported yet."""
80+
_registered_configs[config.canonical_type] = config
81+
# Any previously cached class for this type is now stale.
82+
_loaded_serializers.pop(config.canonical_type, None)
83+
84+
85+
def load_serializer_class(canonical_type):
86+
"""
87+
Resolve and cache the serializer class for ``canonical_type``. Returns
88+
``None`` if no config is registered for that type.
89+
"""
90+
cached = _loaded_serializers.get(canonical_type)
91+
if cached is not None:
92+
return cached
93+
config = _registered_configs.get(canonical_type)
94+
if config is None:
95+
return None
96+
module = importlib.import_module(config.serializer_module)
97+
cls = getattr(module, config.serializer_class)
98+
_loaded_serializers[canonical_type] = cls
99+
return cls
100+
101+
102+
def iter_registered_configs():
103+
"""Iterate all registered configs. Deterministic order (insertion)."""
104+
return list(_registered_configs.values())
105+
106+
107+
class _WrappedLoader(importlib.abc.Loader):
108+
"""Delegating loader that fires a callback after ``exec_module``.
109+
110+
Only ``create_module`` and ``exec_module`` are overridden. Other loader
111+
attributes (``get_resource_reader``, ``get_filename``, ``get_data``,
112+
``is_package``, ``get_source``, ...) are forwarded to the wrapped loader
113+
via ``__getattr__`` so importers that poke at those interfaces continue
114+
to work transparently.
115+
"""
116+
117+
def __init__(self, original_loader, interceptor):
118+
self._original = original_loader
119+
self._interceptor = interceptor
120+
121+
def create_module(self, spec):
122+
return self._original.create_module(spec)
123+
124+
def exec_module(self, module):
125+
self._original.exec_module(module)
126+
self._interceptor._on_module_imported(module)
127+
128+
def __getattr__(self, name):
129+
return getattr(self._original, name)
130+
131+
132+
class _SerializerImportInterceptor(importlib.abc.MetaPathFinder):
133+
"""
134+
:class:`importlib.abc.MetaPathFinder` that watches for a fixed set of
135+
module names and fires :func:`_on_module_imported` once each has been
136+
fully executed.
137+
"""
138+
139+
def __init__(self):
140+
# module_name -> list[SerializerConfig]
141+
self._pending = {}
142+
self._processed = set()
143+
144+
def watch(self, module_name, config):
145+
self._pending.setdefault(module_name, []).append(config)
146+
147+
def find_spec(self, fullname, path, target=None):
148+
if fullname not in self._pending:
149+
return None
150+
# Remove ourselves from the path during the lookup below so Python's
151+
# normal finders (not us) can resolve the real spec. Reinstall after.
152+
was_installed = self in sys.meta_path
153+
if was_installed:
154+
sys.meta_path.remove(self)
155+
try:
156+
spec = importlib.util.find_spec(fullname)
157+
finally:
158+
if was_installed:
159+
sys.meta_path.insert(0, self)
160+
if spec is None or spec.loader is None:
161+
return None
162+
spec.loader = _WrappedLoader(spec.loader, self)
163+
return spec
164+
165+
def _on_module_imported(self, module):
166+
module_name = module.__name__
167+
if module_name in self._processed:
168+
return
169+
self._processed.add(module_name)
170+
for config in self._pending.get(module_name, ()):
171+
class_name = config.canonical_type.rsplit(".", 1)[-1]
172+
if hasattr(module, class_name):
173+
register_serializer_config(config)
174+
175+
176+
_interceptor = _SerializerImportInterceptor()
177+
178+
179+
def _ensure_interceptor_installed():
180+
if _interceptor in sys.meta_path:
181+
sys.meta_path.remove(_interceptor)
182+
sys.meta_path.insert(0, _interceptor)
183+
184+
185+
def register_serializer_for_type(canonical_type, serializer, **kwargs):
186+
"""
187+
Public entry point for extensions.
188+
189+
If the target type's module is already loaded, the config is stored
190+
immediately. Otherwise, an import hook is installed and registration is
191+
deferred to the first ``import`` of the target module.
192+
193+
``canonical_type`` is ``"module.ClassName"``. ``serializer`` is a dotted
194+
path to the serializer class. Additional ``priority`` / ``extra_kwargs``
195+
forwarded into :class:`SerializerConfig`.
196+
"""
197+
config = SerializerConfig(
198+
canonical_type=canonical_type, serializer=serializer, **kwargs
199+
)
200+
module_name, class_name = canonical_type.rsplit(".", 1)
201+
mod = sys.modules.get(module_name)
202+
if mod is not None and hasattr(mod, class_name):
203+
register_serializer_config(config)
204+
return
205+
_ensure_interceptor_installed()
206+
_interceptor.watch(module_name, config)
207+
208+
209+
def _reset_for_tests():
210+
"""Clear all module-level state. Intended for unit tests only."""
211+
_registered_configs.clear()
212+
_loaded_serializers.clear()
213+
_interceptor._pending.clear()
214+
_interceptor._processed.clear()
215+
if _interceptor in sys.meta_path:
216+
sys.meta_path.remove(_interceptor)

metaflow/datastore/artifacts/serializer.py

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1+
import inspect
12
from abc import ABCMeta, abstractmethod
23
from collections import namedtuple
34

45

6+
# Serialization formats. STORAGE produces (blobs, metadata) for the datastore;
7+
# WIRE produces a str for CLI args, protobuf payloads, and cross-process IPC.
8+
STORAGE = "storage"
9+
WIRE = "wire"
10+
11+
512
SerializationMetadata = namedtuple(
6-
"SerializationMetadata", ["type", "size", "encoding", "serializer_info"]
13+
"SerializationMetadata", ["obj_type", "size", "encoding", "serializer_info"]
714
)
815

916

@@ -21,20 +28,14 @@ class SerializedBlob(object):
2128
The blob data (bytes) or a reference key (str).
2229
is_reference : bool, optional
2330
If None, auto-detected from value type: str -> reference, bytes -> new data.
24-
compress_method : str
25-
Compression method for new blobs. Ignored for references. Default "gzip".
26-
NOTE: Not yet wired into the save path — ContentAddressedStore currently
27-
always applies gzip. This field is forward-looking for when per-blob
28-
compression control is needed (e.g., multi-blob IOType support).
2931
"""
3032

31-
def __init__(self, value, is_reference=None, compress_method="gzip"):
33+
def __init__(self, value, is_reference=None):
3234
if not isinstance(value, (str, bytes)):
3335
raise TypeError(
3436
"SerializedBlob value must be str or bytes, got %s" % type(value).__name__
3537
)
3638
self.value = value
37-
self.compress_method = compress_method
3839
if is_reference is None:
3940
self.is_reference = isinstance(value, str)
4041
else:
@@ -55,27 +56,57 @@ class SerializerStore(ABCMeta):
5556
"""
5657

5758
_all_serializers = {}
58-
_registration_order = []
59+
_ordered_cache = None
5960

6061
def __init__(cls, name, bases, namespace):
6162
super().__init__(name, bases, namespace)
62-
if cls.TYPE is not None:
63-
if cls.TYPE not in SerializerStore._all_serializers:
64-
SerializerStore._registration_order.append(cls.TYPE)
65-
SerializerStore._all_serializers[cls.TYPE] = cls
63+
# Skip the abstract base and any subclass that didn't implement all
64+
# abstract methods — registering a partially-abstract class would
65+
# blow up only at dispatch time.
66+
if cls.TYPE is None or inspect.isabstract(cls):
67+
return
68+
SerializerStore._all_serializers[cls.TYPE] = cls
69+
SerializerStore._ordered_cache = None
6670

6771
@staticmethod
6872
def get_ordered_serializers():
6973
"""
7074
Return serializer classes sorted by (PRIORITY, registration_order).
7175
72-
This ordering is deterministic for a given set of loaded serializers.
76+
Python 3.7+ dicts preserve insertion order, so enumerating
77+
``_all_serializers.values()`` yields registration order. A stable sort
78+
on PRIORITY preserves that tiebreaker.
79+
80+
Serializers registered via the lazy registry are materialized here
81+
too: each registered class is imported on demand and folded into the
82+
dispatch order. Without this step, a lazy
83+
``register_serializer_for_type`` call would be silently ignored
84+
at dispatch time.
7385
"""
74-
order = SerializerStore._registration_order
75-
return sorted(
76-
SerializerStore._all_serializers.values(),
77-
key=lambda s: (s.PRIORITY, order.index(s.TYPE)),
78-
)
86+
# Imported locally to avoid a circular import between this module and
87+
# ``lazy_registry`` (which depends on the ArtifactSerializer ABC).
88+
from .lazy_registry import iter_registered_configs, load_serializer_class
89+
90+
lazy_classes = []
91+
for cfg in iter_registered_configs():
92+
cls = load_serializer_class(cfg.canonical_type)
93+
if cls is not None:
94+
lazy_classes.append(cls)
95+
96+
if SerializerStore._ordered_cache is None or lazy_classes:
97+
# De-duplicate: lazy classes typically also self-register via the
98+
# metaclass, but when loaded outside normal import flow they may
99+
# not. ``dict.fromkeys`` preserves first-seen order while dropping
100+
# duplicates.
101+
combined = list(
102+
dict.fromkeys(
103+
list(SerializerStore._all_serializers.values()) + lazy_classes
104+
)
105+
)
106+
SerializerStore._ordered_cache = sorted(
107+
combined, key=lambda s: s.PRIORITY
108+
)
109+
return SerializerStore._ordered_cache
79110

80111

81112
class ArtifactSerializer(object, metaclass=SerializerStore):
@@ -135,36 +166,49 @@ def can_deserialize(cls, metadata):
135166

136167
@classmethod
137168
@abstractmethod
138-
def serialize(cls, obj):
169+
def serialize(cls, obj, format=STORAGE):
139170
"""
140-
Serialize obj to blobs and metadata. Must be side-effect-free.
171+
Serialize obj. Must be side-effect-free: this method may be invoked
172+
multiple times (caching, retries, parallel dispatch) and must not
173+
perform I/O, mutate global state, or register the object elsewhere.
174+
Side effects that need to happen at persist time belong in hooks,
175+
not in the serializer.
141176
142177
Parameters
143178
----------
144179
obj : Any
145180
The Python object to serialize.
181+
format : str
182+
Either ``STORAGE`` (default) or ``WIRE``.
183+
- ``STORAGE`` returns a tuple ``(List[SerializedBlob], SerializationMetadata)``
184+
for persisting through the datastore.
185+
- ``WIRE`` returns a ``str`` representation for CLI args, protobuf
186+
payloads, and cross-process IPC. Serializers that cannot provide
187+
a wire encoding should raise ``NotImplementedError``.
146188
147189
Returns
148190
-------
149-
tuple
150-
(List[SerializedBlob], SerializationMetadata)
191+
tuple or str
151192
"""
152193
raise NotImplementedError
153194

154195
@classmethod
155196
@abstractmethod
156-
def deserialize(cls, blobs, metadata, context):
197+
def deserialize(cls, data, metadata=None, context=None, format=STORAGE):
157198
"""
158-
Deserialize blobs back to a Python object.
199+
Deserialize back to a Python object.
159200
160201
Parameters
161202
----------
162-
blobs : List[bytes]
163-
The raw blob data.
164-
metadata : SerializationMetadata
165-
Metadata stored alongside the artifact.
166-
context : Any
203+
data : Union[List[bytes], str]
204+
``List[bytes]`` when ``format=STORAGE``; ``str`` when ``format=WIRE``.
205+
metadata : SerializationMetadata, optional
206+
Metadata stored alongside the artifact. Required for STORAGE,
207+
ignored for WIRE.
208+
context : Any, optional
167209
Optional context for deserialization (e.g., task vs client loading).
210+
format : str
211+
Either ``STORAGE`` (default) or ``WIRE``.
168212
169213
Returns
170214
-------

0 commit comments

Comments
 (0)