Skip to content

Commit b6f6cf4

Browse files
committed
Unified artifact serializer registration via setup_imports()
Replace the two-path registration scheme (ARTIFACT_SERIALIZERS_DESC + register_serializer_for_type) with a single declarative mechanism. Extensions ship one serializer file per serializer; heavy imports live in a setup_imports() classmethod whose ModuleNotFoundError parks the serializer on an import hook and retries once the awaited module appears in sys.modules. Author-facing API ----------------- class TorchSerializer(ArtifactSerializer): TYPE = "torch" PRIORITY = 50 @classmethod def setup_imports(cls, context=None): cls.lazy_import("torch") cls.lazy_import("pyarrow", alias="pa") @classmethod def can_serialize(cls, obj): return isinstance(obj, cls.torch.Tensor) ... # mfextinit_myext.py ARTIFACT_SERIALIZERS_DESC = [ ("torch", "my_ext.serializers.torch_serializer.TorchSerializer"), ] Mechanism --------- - SerializerStore.bootstrap() walks every extension's ARTIFACT_SERIALIZERS_DESC directly (the serializer category owns its own lifecycle rather than flowing through resolve_plugins), applies ENABLED_ARTIFACT_SERIALIZER +/- toggles, and drives each entry through a state machine: known → importing → class_loaded → importing_deps → active with terminal states `broken` / `disabled` and the retry state `pending_on_imports`. - lazy_import(module_path, alias=None) imports the module, stashes it on cls, returns it. Rejects reserved names (TYPE, PRIORITY, dispatch methods, leading underscore) and double-assignment within one setup_imports() call. - A sys.meta_path interceptor watches the pending modules; when any imports, SerializerStore._on_module_imported re-runs the lifecycle for every parked entry waiting on that module. Loop guard: the same ImportError.name raising twice → broken. - Dispatch reads _active_serializers only. Exceptions raised by can_serialize / can_deserialize on a currently-active serializer are caught, counted in the record's dispatch_error_count, and the serializer is skipped for that artifact only — preserving the PickleSerializer fallback. - On PRIORITY ties, last-registered wins; a lexicographic tiebreak on class_path provides cross-environment reproducibility. Diagnostic API -------------- metaflow.datastore.artifacts.list_serializer_status() returns a list of per-entry dicts with name, class_path, state, awaiting_modules, last_error, priority, type, import_trigger, and dispatch_error_count. Primary use case: answering "why isn't my custom serializer active?" without reading source. Removed ------- - register_serializer_for_type() (public API) - SerializerConfig, register_serializer_config, iter_registered_configs, load_serializer_class, plus the backing declarative-config plumbing inside lazy_registry.py — nothing in the unified path uses them. - ARTIFACT_SERIALIZERS = resolve_plugins("artifact_serializer") in metaflow/plugins/__init__.py (no readers in the codebase). - "artifact_serializer" from _plugin_categories in metaflow/extension_support/plugins.py. Testing ------- - 102 unit tests across test_artifact_serializer, test_pickle_serializer, test_serializer_integration, test_serializer_lifecycle, and test_serializer_public_api. Coverage: state-machine transitions, retry and loop guard, disabled toggle, dispatch exception handling, lazy_import reserved-name + collision guards, setup_imports signature compatibility (both def setup_imports(cls) and def setup_imports(cls, context=None) are supported), subclass inheritance, and public API surface assertions. - SerializerStore._reset_for_tests() clears registry + interceptor state and walks MRO to delattr lazy-imported attributes for test isolation.
1 parent 3cdb08c commit b6f6cf4

13 files changed

Lines changed: 1910 additions & 489 deletions

metaflow/datastore/artifacts/__init__.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,4 @@
55
SerializedBlob,
66
SerializerStore,
77
)
8-
from .lazy_registry import (
9-
SerializerConfig,
10-
load_serializer_class,
11-
register_serializer_config,
12-
register_serializer_for_type,
13-
)
8+
from .diagnostic import list_serializer_status, SerializerState
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""Per-entry diagnostic records for the artifact-serializer lifecycle."""
2+
3+
from dataclasses import dataclass, field
4+
from enum import Enum
5+
from typing import List, Optional
6+
7+
8+
class SerializerState(str, Enum):
9+
KNOWN = "known"
10+
IMPORTING = "importing"
11+
CLASS_LOADED = "class_loaded"
12+
IMPORTING_DEPS = "importing_deps"
13+
ACTIVE = "active"
14+
PENDING_ON_IMPORTS = "pending_on_imports"
15+
BROKEN = "broken"
16+
DISABLED = "disabled"
17+
18+
19+
@dataclass
20+
class SerializerRecord:
21+
name: str
22+
class_path: str
23+
state: SerializerState = SerializerState.KNOWN
24+
awaiting_modules: List[str] = field(default_factory=list)
25+
last_error: Optional[str] = None
26+
priority: Optional[int] = None
27+
type: Optional[str] = None
28+
import_trigger: Optional[str] = None
29+
dispatch_error_count: int = 0
30+
31+
def as_dict(self):
32+
return {
33+
"name": self.name,
34+
"class_path": self.class_path,
35+
"state": self.state.value,
36+
"awaiting_modules": list(self.awaiting_modules),
37+
"last_error": self.last_error,
38+
"priority": self.priority,
39+
"type": self.type,
40+
"import_trigger": self.import_trigger,
41+
"dispatch_error_count": self.dispatch_error_count,
42+
}
43+
44+
45+
def list_serializer_status():
46+
"""Return a list of per-serializer diagnostic records as dicts.
47+
48+
One entry per tuple in ``ARTIFACT_SERIALIZERS_DESC`` (post-toggle),
49+
including entries in ``pending_on_imports``, ``broken``, and
50+
``disabled`` states. Used for debugging "why isn't my custom
51+
serializer active?".
52+
"""
53+
from .serializer import SerializerStore
54+
55+
return [rec.as_dict() for rec in SerializerStore._records.values()]
Lines changed: 41 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
"""
2-
Lazy serializer registry driven by an import hook.
2+
Import-hook plumbing that the serializer registry uses to retry a serializer's
3+
``setup_imports`` after one of its required modules becomes importable.
34
45
Extensions ship serializers whose implementation modules may import optional
56
heavy dependencies (``torch``, ``pyarrow``, ``fastavro``, ``protobuf``, ...).
6-
Loading those serializer modules unconditionally at ``metaflow`` import time
7-
would force every user to pay for dependencies they may not have installed.
8-
9-
This module defers both the serializer class import and its instantiation
10-
until one of two things happens:
11-
12-
1. The target type's module is already present in :data:`sys.modules` when
13-
registration is called — registration then happens immediately.
14-
2. The target type's module is imported later by the user's code. An
15-
``importlib`` hook watches for that event and performs registration the
16-
first time the module is loaded.
17-
18-
The hook is installed on :data:`sys.meta_path` and removes itself from the
19-
path during its own ``find_spec`` call to avoid recursion.
7+
Loading those modules unconditionally at ``metaflow`` import time would force
8+
every user to pay for dependencies they may not have installed. When
9+
``SerializerStore.bootstrap_entries`` encounters such a missing module, it
10+
parks the entry in ``pending_on_imports`` state and installs a watch here.
11+
The first time the awaited module is imported by the user's code, this
12+
interceptor fires ``SerializerStore._on_module_imported`` so the registry can
13+
retry activation.
14+
15+
The interceptor is installed on :data:`sys.meta_path` and removes itself from
16+
the path during its own ``find_spec`` call to avoid recursion.
17+
18+
This module has no public API — extensions declare serializers through
19+
``ARTIFACT_SERIALIZERS_DESC`` in their ``mfextinit_*`` file and interact with
20+
the registry via the state-machine public surface in
21+
:mod:`metaflow.datastore.artifacts.serializer`.
2022
"""
2123

2224
import importlib
@@ -25,84 +27,6 @@
2527
import importlib.util
2628
import sys
2729

28-
from dataclasses import dataclass, field
29-
30-
31-
@dataclass
32-
class SerializerConfig:
33-
"""
34-
Declarative entry recording *which* serializer handles *which* type,
35-
without actually importing the serializer class. The class is imported on
36-
first use by :func:`load_serializer_class`.
37-
38-
Parameters
39-
----------
40-
canonical_type : str
41-
``"module.ClassName"`` — e.g. ``"builtins.dict"``, ``"torch.Tensor"``.
42-
serializer : str
43-
Dotted import path to the serializer class, e.g.
44-
``"my_extension.serializers.TorchSerializer"``.
45-
priority : int
46-
Dispatch priority. Lower is tried first. Matches the existing
47-
``ArtifactSerializer.PRIORITY`` convention.
48-
extra_kwargs : dict
49-
Optional kwargs passed to the serializer class ``__init__``.
50-
"""
51-
52-
canonical_type: str
53-
serializer: str
54-
priority: int = 100
55-
extra_kwargs: dict = field(default_factory=dict)
56-
57-
def __post_init__(self):
58-
if not self.canonical_type:
59-
raise ValueError("canonical_type cannot be empty")
60-
if not self.serializer or "." not in self.serializer:
61-
raise ValueError("serializer must be in 'module.ClassName' format")
62-
63-
@property
64-
def serializer_module(self):
65-
return ".".join(self.serializer.split(".")[:-1])
66-
67-
@property
68-
def serializer_class(self):
69-
return self.serializer.split(".")[-1]
70-
71-
72-
# Module-level registry. Keyed by canonical_type -> SerializerConfig.
73-
# A separate dict caches instantiated classes so repeated lookups are O(1).
74-
_registered_configs = {}
75-
_loaded_serializers = {}
76-
77-
78-
def register_serializer_config(config):
79-
"""Store a config immediately. The serializer class is not imported yet."""
80-
_registered_configs[config.canonical_type] = config
81-
# Any previously cached class for this type is now stale.
82-
_loaded_serializers.pop(config.canonical_type, None)
83-
84-
85-
def load_serializer_class(canonical_type):
86-
"""
87-
Resolve and cache the serializer class for ``canonical_type``. Returns
88-
``None`` if no config is registered for that type.
89-
"""
90-
cached = _loaded_serializers.get(canonical_type)
91-
if cached is not None:
92-
return cached
93-
config = _registered_configs.get(canonical_type)
94-
if config is None:
95-
return None
96-
module = importlib.import_module(config.serializer_module)
97-
cls = getattr(module, config.serializer_class)
98-
_loaded_serializers[canonical_type] = cls
99-
return cls
100-
101-
102-
def iter_registered_configs():
103-
"""Iterate all registered configs. Deterministic order (insertion)."""
104-
return list(_registered_configs.values())
105-
10630

10731
class _WrappedLoader(importlib.abc.Loader):
10832
"""Delegating loader that fires a callback after ``exec_module``.
@@ -131,21 +55,26 @@ def __getattr__(self, name):
13155

13256
class _SerializerImportInterceptor(importlib.abc.MetaPathFinder):
13357
"""
134-
:class:`importlib.abc.MetaPathFinder` that watches for a fixed set of
135-
module names and fires :func:`_on_module_imported` once each has been
136-
fully executed.
58+
:class:`importlib.abc.MetaPathFinder` that watches a set of module names
59+
and notifies :class:`SerializerStore` once each has finished executing.
13760
"""
13861

13962
def __init__(self):
140-
# module_name -> list[SerializerConfig]
141-
self._pending = {}
63+
# Module names to watch on behalf of SerializerStore records parked
64+
# via _park_on_import_error. Firing calls
65+
# SerializerStore._on_module_imported.
66+
self._watched = set()
67+
# Modules we have already notified about, to avoid firing twice if
68+
# the same module gets imported through multiple paths.
14269
self._processed = set()
14370

144-
def watch(self, module_name, config):
145-
self._pending.setdefault(module_name, []).append(config)
71+
def watch(self, module_name):
72+
"""Watch ``module_name``. When it finishes executing,
73+
:meth:`SerializerStore._on_module_imported` is called."""
74+
self._watched.add(module_name)
14675

14776
def find_spec(self, fullname, path, target=None):
148-
if fullname not in self._pending:
77+
if fullname not in self._watched:
14978
return None
15079
# Remove ourselves from the path during the lookup below so Python's
15180
# normal finders (not us) can resolve the real spec. Reinstall after.
@@ -167,10 +96,16 @@ def _on_module_imported(self, module):
16796
if module_name in self._processed:
16897
return
16998
self._processed.add(module_name)
170-
for config in self._pending.get(module_name, ()):
171-
class_name = config.canonical_type.rsplit(".", 1)[-1]
172-
if hasattr(module, class_name):
173-
register_serializer_config(config)
99+
if module_name not in self._watched:
100+
return
101+
try:
102+
from .serializer import SerializerStore
103+
104+
SerializerStore._on_module_imported(module_name, module)
105+
except Exception:
106+
# A broken callback must not break the host's import. The record
107+
# itself will be marked BROKEN via _retry_bootstrap.
108+
pass
174109

175110

176111
_interceptor = _SerializerImportInterceptor()
@@ -182,35 +117,9 @@ def _ensure_interceptor_installed():
182117
sys.meta_path.insert(0, _interceptor)
183118

184119

185-
def register_serializer_for_type(canonical_type, serializer, **kwargs):
186-
"""
187-
Public entry point for extensions.
188-
189-
If the target type's module is already loaded, the config is stored
190-
immediately. Otherwise, an import hook is installed and registration is
191-
deferred to the first ``import`` of the target module.
192-
193-
``canonical_type`` is ``"module.ClassName"``. ``serializer`` is a dotted
194-
path to the serializer class. Additional ``priority`` / ``extra_kwargs``
195-
forwarded into :class:`SerializerConfig`.
196-
"""
197-
config = SerializerConfig(
198-
canonical_type=canonical_type, serializer=serializer, **kwargs
199-
)
200-
module_name, class_name = canonical_type.rsplit(".", 1)
201-
mod = sys.modules.get(module_name)
202-
if mod is not None and hasattr(mod, class_name):
203-
register_serializer_config(config)
204-
return
205-
_ensure_interceptor_installed()
206-
_interceptor.watch(module_name, config)
207-
208-
209120
def _reset_for_tests():
210121
"""Clear all module-level state. Intended for unit tests only."""
211-
_registered_configs.clear()
212-
_loaded_serializers.clear()
213-
_interceptor._pending.clear()
122+
_interceptor._watched.clear()
214123
_interceptor._processed.clear()
215124
if _interceptor in sys.meta_path:
216125
sys.meta_path.remove(_interceptor)

0 commit comments

Comments
 (0)