Skip to content

Commit 0044f28

Browse files
author
Saeid Barati
committed
Address review feedback: wire/storage formats + lazy registry
Wire vs storage format - Added WIRE and STORAGE constants in metaflow.datastore.artifacts.serializer. - ArtifactSerializer.serialize/deserialize now accept a ``format`` kwarg so a single class can own both the storage path (datastore blobs + metadata) and the wire path (string for CLI args, protobuf payloads, cross-process IPC). - PickleSerializer implements STORAGE only; WIRE raises NotImplementedError with an explanation (pickle bytes are not safe as a wire payload). - Serializers that want wire support implement it on the same class; there's no need for a second class per format. Lazy import-hook registry - New module metaflow/datastore/artifacts/lazy_registry.py with SerializerConfig, an importlib.abc.MetaPathFinder interceptor, and public register_serializer_for_type / load_serializer_class entry points. - If the target type's module is already in sys.modules, registration is immediate. Otherwise a hook is installed on sys.meta_path and registration fires the first time the user's code imports the target module. This defers the cost of serializer-module imports (torch, pyarrow, fastavro, ...) until those dependencies are actually in play. - find_spec temporarily removes the interceptor from sys.meta_path during its lookup to avoid recursion. Review nits - serializer.py: rename SerializationMetadata.type field to obj_type to avoid shadowing the type() builtin (dict key inside _info stays "type" for backward compatibility with existing datastores). - serializer.py: SerializerStore skips TYPE=None AND any subclass that is still abstract, via inspect.isabstract(). - serializer.py: get_ordered_serializers memoizes the sorted list and invalidates on new registration. Drops the O(n²) list.index tiebreaker — Python 3.7+ dicts preserve insertion order. - task_datastore.py: lift SerializerStore / SerializationMetadata imports to module top (were on hot paths: __init__ and load_artifacts). - task_datastore.py: the "no serializer claimed this artifact" branch now raises DataException with a message pointing at the PickleSerializer fallback invariant, instead of the misleading UnpicklableArtifactException. - task_datastore.py: "no deserializer claimed this artifact" now looks up ``serializer_info["source"]`` and hints at a missing extension. - SerializedBlob.compress_method: removed. It was documented as "not wired into the save path" — shipping an unwired knob invites extension authors to rely on it and discover it is a no-op. Will come back with its consuming code in a later change. - serialize() docstring rewritten: the side-effect-free contract is there to support retries, caching, and parallel dispatch, and to keep serializers testable; I/O belongs in hooks. Tests - New test/unit/test_lazy_serializer_registry.py (9 tests covering config validation, eager registration for already-imported types, deferred registration through the import hook, and the recursion guard). - test_artifact_serializer.py gains format-dispatch coverage (STORAGE and WIRE round-trip on a toy dual-format serializer; PickleSerializer raises on WIRE). - Removed tests for compress_method and adjusted tests that poked at _registration_order so they go through the public API only.
1 parent e68748f commit 0044f28

9 files changed

Lines changed: 542 additions & 100 deletions

File tree

metaflow/datastore/artifacts/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,12 @@
33
SerializationMetadata,
44
SerializedBlob,
55
SerializerStore,
6+
STORAGE,
7+
WIRE,
8+
)
9+
from .lazy_registry import (
10+
SerializerConfig,
11+
load_serializer_class,
12+
register_serializer_config,
13+
register_serializer_for_type,
614
)
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
"""
2+
Lazy serializer registry driven by an import hook.
3+
4+
Extensions ship serializers whose implementation modules may import optional
5+
heavy dependencies (``torch``, ``pyarrow``, ``fastavro``, ``protobuf``, ...).
6+
Loading those serializer modules unconditionally at ``metaflow`` import time
7+
would force every user to pay for dependencies they may not have installed.
8+
9+
This module defers both the serializer class import and its instantiation
10+
until one of two things happens:
11+
12+
1. The target type's module is already present in :data:`sys.modules` when
13+
registration is called — registration then happens immediately.
14+
2. The target type's module is imported later by the user's code. An
15+
``importlib`` hook watches for that event and performs registration the
16+
first time the module is loaded.
17+
18+
The hook is installed on :data:`sys.meta_path` and removes itself from the
19+
path during its own ``find_spec`` call to avoid recursion.
20+
"""
21+
22+
import importlib
23+
import importlib.abc
24+
import importlib.machinery
25+
import importlib.util
26+
import sys
27+
28+
from dataclasses import dataclass, field
29+
30+
31+
@dataclass
32+
class SerializerConfig:
33+
"""
34+
Declarative entry recording *which* serializer handles *which* type,
35+
without actually importing the serializer class. The class is imported on
36+
first use by :func:`load_serializer_class`.
37+
38+
Parameters
39+
----------
40+
canonical_type : str
41+
``"module.ClassName"`` — e.g. ``"builtins.dict"``, ``"torch.Tensor"``.
42+
serializer : str
43+
Dotted import path to the serializer class, e.g.
44+
``"my_extension.serializers.TorchSerializer"``.
45+
priority : int
46+
Dispatch priority. Lower is tried first. Matches the existing
47+
``ArtifactSerializer.PRIORITY`` convention.
48+
extra_kwargs : dict
49+
Optional kwargs passed to the serializer class ``__init__``.
50+
"""
51+
52+
canonical_type: str
53+
serializer: str
54+
priority: int = 100
55+
extra_kwargs: dict = field(default_factory=dict)
56+
57+
def __post_init__(self):
58+
if not self.canonical_type:
59+
raise ValueError("canonical_type cannot be empty")
60+
if not self.serializer or "." not in self.serializer:
61+
raise ValueError("serializer must be in 'module.ClassName' format")
62+
63+
@property
64+
def serializer_module(self):
65+
return ".".join(self.serializer.split(".")[:-1])
66+
67+
@property
68+
def serializer_class(self):
69+
return self.serializer.split(".")[-1]
70+
71+
72+
# Module-level registry. Keyed by canonical_type -> SerializerConfig.
73+
# A separate dict caches instantiated classes so repeated lookups are O(1).
74+
_registered_configs = {}
75+
_loaded_serializers = {}
76+
77+
78+
def register_serializer_config(config):
79+
"""Store a config immediately. The serializer class is not imported yet."""
80+
_registered_configs[config.canonical_type] = config
81+
# Any previously cached class for this type is now stale.
82+
_loaded_serializers.pop(config.canonical_type, None)
83+
84+
85+
def load_serializer_class(canonical_type):
86+
"""
87+
Resolve and cache the serializer class for ``canonical_type``. Returns
88+
``None`` if no config is registered for that type.
89+
"""
90+
cached = _loaded_serializers.get(canonical_type)
91+
if cached is not None:
92+
return cached
93+
config = _registered_configs.get(canonical_type)
94+
if config is None:
95+
return None
96+
module = importlib.import_module(config.serializer_module)
97+
cls = getattr(module, config.serializer_class)
98+
_loaded_serializers[canonical_type] = cls
99+
return cls
100+
101+
102+
def iter_registered_configs():
103+
"""Iterate all registered configs. Deterministic order (insertion)."""
104+
return list(_registered_configs.values())
105+
106+
107+
class _WrappedLoader(importlib.abc.Loader):
108+
"""Delegating loader that fires a callback after ``exec_module``."""
109+
110+
def __init__(self, original_loader, interceptor):
111+
self._original = original_loader
112+
self._interceptor = interceptor
113+
114+
def create_module(self, spec):
115+
return self._original.create_module(spec)
116+
117+
def exec_module(self, module):
118+
self._original.exec_module(module)
119+
self._interceptor._on_module_imported(module)
120+
121+
122+
class _SerializerImportInterceptor(importlib.abc.MetaPathFinder):
123+
"""
124+
:class:`importlib.abc.MetaPathFinder` that watches for a fixed set of
125+
module names and fires :func:`_on_module_imported` once each has been
126+
fully executed.
127+
"""
128+
129+
def __init__(self):
130+
# module_name -> list[SerializerConfig]
131+
self._pending = {}
132+
self._processed = set()
133+
134+
def watch(self, module_name, config):
135+
self._pending.setdefault(module_name, []).append(config)
136+
137+
def find_spec(self, fullname, path, target=None):
138+
if fullname not in self._pending:
139+
return None
140+
# Remove ourselves from the path during the lookup below so Python's
141+
# normal finders (not us) can resolve the real spec. Reinstall after.
142+
was_installed = self in sys.meta_path
143+
if was_installed:
144+
sys.meta_path.remove(self)
145+
try:
146+
spec = importlib.util.find_spec(fullname)
147+
finally:
148+
if was_installed:
149+
sys.meta_path.insert(0, self)
150+
if spec is None or spec.loader is None:
151+
return None
152+
spec.loader = _WrappedLoader(spec.loader, self)
153+
return spec
154+
155+
def _on_module_imported(self, module):
156+
module_name = module.__name__
157+
if module_name in self._processed:
158+
return
159+
self._processed.add(module_name)
160+
for config in self._pending.get(module_name, ()):
161+
class_name = config.canonical_type.rsplit(".", 1)[-1]
162+
if hasattr(module, class_name):
163+
register_serializer_config(config)
164+
165+
166+
_interceptor = _SerializerImportInterceptor()
167+
168+
169+
def _ensure_interceptor_installed():
170+
if _interceptor in sys.meta_path:
171+
sys.meta_path.remove(_interceptor)
172+
sys.meta_path.insert(0, _interceptor)
173+
174+
175+
def register_serializer_for_type(canonical_type, serializer, **kwargs):
176+
"""
177+
Public entry point for extensions.
178+
179+
If the target type's module is already loaded, the config is stored
180+
immediately. Otherwise, an import hook is installed and registration is
181+
deferred to the first ``import`` of the target module.
182+
183+
``canonical_type`` is ``"module.ClassName"``. ``serializer`` is a dotted
184+
path to the serializer class. Additional ``priority`` / ``extra_kwargs``
185+
forwarded into :class:`SerializerConfig`.
186+
"""
187+
config = SerializerConfig(
188+
canonical_type=canonical_type, serializer=serializer, **kwargs
189+
)
190+
module_name, class_name = canonical_type.rsplit(".", 1)
191+
mod = sys.modules.get(module_name)
192+
if mod is not None and hasattr(mod, class_name):
193+
register_serializer_config(config)
194+
return
195+
_ensure_interceptor_installed()
196+
_interceptor.watch(module_name, config)
197+
198+
199+
def _reset_for_tests():
200+
"""Clear all module-level state. Intended for unit tests only."""
201+
_registered_configs.clear()
202+
_loaded_serializers.clear()
203+
_interceptor._pending.clear()
204+
_interceptor._processed.clear()
205+
if _interceptor in sys.meta_path:
206+
sys.meta_path.remove(_interceptor)

metaflow/datastore/artifacts/serializer.py

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1+
import inspect
12
from abc import ABCMeta, abstractmethod
23
from collections import namedtuple
34

45

6+
# Serialization formats. STORAGE produces (blobs, metadata) for the datastore;
7+
# WIRE produces a str for CLI args, protobuf payloads, and cross-process IPC.
8+
STORAGE = "storage"
9+
WIRE = "wire"
10+
11+
512
SerializationMetadata = namedtuple(
6-
"SerializationMetadata", ["type", "size", "encoding", "serializer_info"]
13+
"SerializationMetadata", ["obj_type", "size", "encoding", "serializer_info"]
714
)
815

916

@@ -21,20 +28,14 @@ class SerializedBlob(object):
2128
The blob data (bytes) or a reference key (str).
2229
is_reference : bool, optional
2330
If None, auto-detected from value type: str -> reference, bytes -> new data.
24-
compress_method : str
25-
Compression method for new blobs. Ignored for references. Default "gzip".
26-
NOTE: Not yet wired into the save path — ContentAddressedStore currently
27-
always applies gzip. This field is forward-looking for when per-blob
28-
compression control is needed (e.g., multi-blob IOType support).
2931
"""
3032

31-
def __init__(self, value, is_reference=None, compress_method="gzip"):
33+
def __init__(self, value, is_reference=None):
3234
if not isinstance(value, (str, bytes)):
3335
raise TypeError(
3436
"SerializedBlob value must be str or bytes, got %s" % type(value).__name__
3537
)
3638
self.value = value
37-
self.compress_method = compress_method
3839
if is_reference is None:
3940
self.is_reference = isinstance(value, str)
4041
else:
@@ -55,27 +56,33 @@ class SerializerStore(ABCMeta):
5556
"""
5657

5758
_all_serializers = {}
58-
_registration_order = []
59+
_ordered_cache = None
5960

6061
def __init__(cls, name, bases, namespace):
6162
super().__init__(name, bases, namespace)
62-
if cls.TYPE is not None:
63-
if cls.TYPE not in SerializerStore._all_serializers:
64-
SerializerStore._registration_order.append(cls.TYPE)
65-
SerializerStore._all_serializers[cls.TYPE] = cls
63+
# Skip the abstract base and any subclass that didn't implement all
64+
# abstract methods — registering a partially-abstract class would
65+
# blow up only at dispatch time.
66+
if cls.TYPE is None or inspect.isabstract(cls):
67+
return
68+
SerializerStore._all_serializers[cls.TYPE] = cls
69+
SerializerStore._ordered_cache = None
6670

6771
@staticmethod
6872
def get_ordered_serializers():
6973
"""
7074
Return serializer classes sorted by (PRIORITY, registration_order).
7175
72-
This ordering is deterministic for a given set of loaded serializers.
76+
Python 3.7+ dicts preserve insertion order, so enumerating
77+
``_all_serializers.values()`` yields registration order. A stable sort
78+
on PRIORITY preserves that tiebreaker. The result is cached and
79+
invalidated on any new registration.
7380
"""
74-
order = SerializerStore._registration_order
75-
return sorted(
76-
SerializerStore._all_serializers.values(),
77-
key=lambda s: (s.PRIORITY, order.index(s.TYPE)),
78-
)
81+
if SerializerStore._ordered_cache is None:
82+
SerializerStore._ordered_cache = sorted(
83+
SerializerStore._all_serializers.values(), key=lambda s: s.PRIORITY
84+
)
85+
return SerializerStore._ordered_cache
7986

8087

8188
class ArtifactSerializer(object, metaclass=SerializerStore):
@@ -135,36 +142,49 @@ def can_deserialize(cls, metadata):
135142

136143
@classmethod
137144
@abstractmethod
138-
def serialize(cls, obj):
145+
def serialize(cls, obj, format=STORAGE):
139146
"""
140-
Serialize obj to blobs and metadata. Must be side-effect-free.
147+
Serialize obj. Must be side-effect-free: this method may be invoked
148+
multiple times (caching, retries, parallel dispatch) and must not
149+
perform I/O, mutate global state, or register the object elsewhere.
150+
Side effects that need to happen at persist time belong in hooks,
151+
not in the serializer.
141152
142153
Parameters
143154
----------
144155
obj : Any
145156
The Python object to serialize.
157+
format : str
158+
Either ``STORAGE`` (default) or ``WIRE``.
159+
- ``STORAGE`` returns a tuple ``(List[SerializedBlob], SerializationMetadata)``
160+
for persisting through the datastore.
161+
- ``WIRE`` returns a ``str`` representation for CLI args, protobuf
162+
payloads, and cross-process IPC. Serializers that cannot provide
163+
a wire encoding should raise ``NotImplementedError``.
146164
147165
Returns
148166
-------
149-
tuple
150-
(List[SerializedBlob], SerializationMetadata)
167+
tuple or str
151168
"""
152169
raise NotImplementedError
153170

154171
@classmethod
155172
@abstractmethod
156-
def deserialize(cls, blobs, metadata, context):
173+
def deserialize(cls, data, metadata=None, context=None, format=STORAGE):
157174
"""
158-
Deserialize blobs back to a Python object.
175+
Deserialize back to a Python object.
159176
160177
Parameters
161178
----------
162-
blobs : List[bytes]
163-
The raw blob data.
164-
metadata : SerializationMetadata
165-
Metadata stored alongside the artifact.
166-
context : Any
179+
data : Union[List[bytes], str]
180+
``List[bytes]`` when ``format=STORAGE``; ``str`` when ``format=WIRE``.
181+
metadata : SerializationMetadata, optional
182+
Metadata stored alongside the artifact. Required for STORAGE,
183+
ignored for WIRE.
184+
context : Any, optional
167185
Optional context for deserialization (e.g., task vs client loading).
186+
format : str
187+
Either ``STORAGE`` (default) or ``WIRE``.
168188
169189
Returns
170190
-------

0 commit comments

Comments
 (0)