Skip to content

Commit 07d78a0

Browse files
author
Saeid Barati
committed
IOType: registry-based deserialization for Flow client round-trip
The metadata service stores only the artifact encoding string ("iotype:<type_name>"), not the full serializer_info dict. So when the Flow client reconstructs an artifact, IOTypeSerializer.deserialize had no iotype_module/iotype_class to import and KeyError'd. Populate a registry via IOType.__init_subclass__ keyed by type_name, and have IOTypeSerializer.deserialize resolve the class from the encoding suffix first. iotype_module/iotype_class hints remain a secondary lookup for introspecting artifacts whose owning extension isn't loaded locally. Fixes round-trip of IOType artifacts through Flow(...).latest_run .end_task[...].data without touching the metadata service or client core.py.
1 parent 358f3cd commit 07d78a0

2 files changed

Lines changed: 50 additions & 8 deletions

File tree

metaflow/io_types/base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@
2626

2727
_UNSET = object()
2828

29+
# Registry of concrete IOType subclasses keyed by their ``type_name``. Populated
30+
# by ``IOType.__init_subclass__``. The datastore encodes each artifact's
31+
# ``type_name`` in its ``SerializationMetadata.encoding`` (``iotype:<name>``),
32+
# so ``IOTypeSerializer.deserialize`` can recover the class without the
33+
# metadata service having to persist the Python module+class path.
34+
_TYPE_REGISTRY = {}
35+
36+
37+
def get_iotype_by_name(type_name):
38+
"""Return the IOType subclass registered under ``type_name``, or None."""
39+
return _TYPE_REGISTRY.get(type_name)
40+
2941

3042
class IOType(object, metaclass=ABCMeta):
3143
"""
@@ -45,6 +57,17 @@ class IOType(object, metaclass=ABCMeta):
4557

4658
type_name = None # e.g. "text", "json", "int64" — set by subclasses.
4759

60+
def __init_subclass__(cls, **kwargs):
61+
super().__init_subclass__(**kwargs)
62+
# Register concrete subclasses so IOTypeSerializer can recover them
63+
# from just the ``type_name`` suffix of a stored artifact's encoding.
64+
# Abstract intermediates (``type_name`` is still None) don't register.
65+
# Last-write-wins: production code is expected to declare each
66+
# ``type_name`` on exactly one class; test-local subclasses that reuse
67+
# a name harmlessly overwrite each other.
68+
if cls.type_name:
69+
_TYPE_REGISTRY[cls.type_name] = cls
70+
4871
def __init__(self, value=_UNSET):
4972
self._value = value
5073

metaflow/plugins/datastores/serializers/iotype_serializer.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,20 @@
66
STORAGE,
77
WIRE,
88
)
9-
from metaflow.io_types.base import IOType
9+
from metaflow.io_types.base import IOType, get_iotype_by_name
1010

1111

1212
class IOTypeSerializer(ArtifactSerializer):
1313
"""
1414
Bridge between :class:`IOType` and the pluggable serializer framework.
1515
1616
Claims any :class:`IOType` instance on save. On load, reconstructs the
17-
original subclass from the ``iotype_module`` / ``iotype_class`` hints
18-
that were written into ``serializer_info``.
17+
original subclass by looking up its ``type_name`` in the global IOType
18+
registry (populated via :meth:`IOType.__init_subclass__`). The
19+
``iotype_module`` / ``iotype_class`` hints stored in ``serializer_info``
20+
are kept as a secondary lookup path — useful when a subclass isn't yet
21+
registered in the reader process, or when inspecting artifacts produced
22+
by extensions whose code isn't installed locally.
1923
2024
``PRIORITY`` is 50 — ahead of the default (100) so this bridge catches
2125
:class:`IOType` artifacts before any generic catch-all, and always ahead
@@ -74,13 +78,28 @@ def deserialize(cls, data, metadata=None, context=None, format=STORAGE):
7478
raise NotImplementedError(
7579
"IOTypeSerializer only handles the STORAGE format."
7680
)
77-
info = metadata.serializer_info
78-
mod = importlib.import_module(info["iotype_module"])
79-
iotype_cls = getattr(mod, info["iotype_class"])
81+
info = metadata.serializer_info or {}
82+
# Primary path: registry lookup by the type_name encoded in the
83+
# artifact's encoding. Works whether or not the metadata service
84+
# propagates ``serializer_info`` to the reader.
85+
type_name = metadata.encoding[len(cls._ENCODING_PREFIX):]
86+
iotype_cls = get_iotype_by_name(type_name)
87+
# Fallback: explicit module/class hints in serializer_info. Useful for
88+
# inspecting artifacts produced by extensions not loaded locally.
89+
if iotype_cls is None and "iotype_module" in info and "iotype_class" in info:
90+
mod = importlib.import_module(info["iotype_module"])
91+
iotype_cls = getattr(mod, info["iotype_class"])
92+
if iotype_cls is None:
93+
raise ValueError(
94+
"IOTypeSerializer could not resolve a class for encoding %r; "
95+
"no IOType subclass is registered under type_name %r and "
96+
"serializer_info lacks iotype_module/iotype_class hints."
97+
% (metadata.encoding, type_name)
98+
)
8099
# Only allow actual IOType subclasses — metadata is untrusted input.
81100
if not (isinstance(iotype_cls, type) and issubclass(iotype_cls, IOType)):
82101
raise ValueError(
83-
"IOTypeSerializer metadata references '%s.%s' which is not an "
84-
"IOType subclass" % (info["iotype_module"], info["iotype_class"])
102+
"IOTypeSerializer resolved %r for encoding %r, which is not "
103+
"an IOType subclass" % (iotype_cls, metadata.encoding)
85104
)
86105
return iotype_cls.deserialize(data, format=STORAGE, metadata=info)

0 commit comments

Comments
 (0)