Skip to content

Commit 603a087

Browse files
Saeid BaratiSaeid Barati
authored andcommitted
Add IOType ABC, Json, Struct, and the datastore bridge
On top of Netflix#3117, this ships the typed-artifact layer. Scope kept deliberately small: - ``IOType`` ABC — the contract extension authors target. - ``Json`` and ``Struct`` — two concrete types with clear standalone value in core (wire format for CLI/IPC, cross-language JSON bytes on storage, self-describing schemas via ``to_spec()``, no pickle code-execution risk). - ``IOTypeSerializer`` — the bridge that plugs any ``IOType`` instance into the ``ArtifactSerializer`` dispatch added by Netflix#3117 so save/load through the datastore just works. What's intentionally *not* in this PR - Primitive wrappers (Int32/Int64/Float32/Float64/Bool/Text). Standard Python numbers and strings flow through ``PickleSerializer`` unchanged. Wrapping is opt-in, for cases where you want constraints/metadata attached. - ``Tensor``. Pulls in numpy + byte-order/dtype opinions; belongs in an extension that can own those choices. - ``List`` / ``Map`` / ``Enum``. Thin wrappers whose value over plain JSON is mostly ``to_spec()`` — not enough on their own for core. Contract ``serialize(format=...)`` / ``deserialize(data, format=..., **kw)`` mirror the ``ArtifactSerializer`` signature from Netflix#3117 and use the same ``WIRE`` / ``STORAGE`` constants, so one subclass owns both representations: - ``STORAGE`` → ``(List[SerializedBlob], metadata_dict)`` for persisting through the datastore. - ``WIRE`` → ``str`` for CLI args, protobuf payloads, and cross-process IPC. Subclasses implement four hooks (``_wire_serialize``, ``_wire_deserialize``, ``_storage_serialize``, ``_storage_deserialize``). ``type_name`` + ``to_spec()`` support JSON schema generation. Instantiating without the hooks raises ``TypeError``. ``IOTypeSerializer`` is registered via ``ARTIFACT_SERIALIZERS_DESC`` with ``PRIORITY=50`` — ahead of the default 100 so it catches ``IOType`` instances before a generic catch-all, and always ahead of the ``PickleSerializer`` fallback (9999). It implements only ``STORAGE``; wire encoding is produced by calling ``IOType.serialize(format=WIRE)`` directly. Tests - ``test/unit/io_types/test_base.py`` — abstract instantiation, WIRE/STORAGE dispatch, invalid format, equality/hash, spec. - ``test/unit/io_types/test_json_type.py`` — round-trips. - ``test/unit/io_types/test_struct_type.py`` — dataclass round-trip, dict round-trip, ``to_spec()`` with dataclass fields. - ``test/unit/io_types/test_iotype_serializer.py`` — bridge ``can_serialize``/``can_deserialize``, round-trip through dataclass reconstruction, rejection of non-IOType classes in metadata (security), WIRE not supported on the bridge.
1 parent 0044f28 commit 603a087

11 files changed

Lines changed: 702 additions & 0 deletions

File tree

metaflow/io_types/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .base import IOType
2+
from .json_type import Json
3+
from .struct_type import Struct

metaflow/io_types/base.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""Typed-artifact contract for Metaflow.
2+
3+
This module defines the minimal :class:`IOType` abstract base class. OSS
4+
Metaflow ships the contract; concrete types (scalars, tensors, enums,
5+
dataclass-backed structs, etc.) live in extensions — they embody
6+
deployment-specific opinions about encoding, byte order, and dataclass
7+
inference that do not belong in core.
8+
9+
:class:`IOType` mirrors the ``format`` argument introduced on
10+
:class:`metaflow.datastore.artifacts.serializer.ArtifactSerializer` so a
11+
single subclass can own both representations:
12+
13+
- ``STORAGE`` — blob-based, persisted through the datastore.
14+
- ``WIRE`` — string-based, for CLI args, protobuf payloads, and
15+
cross-process IPC.
16+
17+
Subclasses implement four hooks (``_wire_serialize``, ``_wire_deserialize``,
18+
``_storage_serialize``, ``_storage_deserialize``); callers use the public
19+
``serialize(format=...)`` / ``deserialize(data, format=...)`` methods.
20+
"""
21+
22+
from abc import ABCMeta, abstractmethod
23+
24+
from metaflow.datastore.artifacts.serializer import STORAGE, WIRE
25+
26+
27+
_UNSET = object()
28+
29+
30+
class IOType(object, metaclass=ABCMeta):
31+
"""
32+
Base class for typed Metaflow artifacts.
33+
34+
An :class:`IOType` instance plays two roles:
35+
36+
- **Descriptor** (no value): ``Int64`` in a spec describes an int64
37+
field.
38+
- **Wrapper** (with value): ``Int64(42)`` wraps a value for typed
39+
serialization.
40+
41+
Subclasses implement four internal operations, dispatched by the
42+
``format`` argument of the public :meth:`serialize` / :meth:`deserialize`
43+
methods.
44+
"""
45+
46+
type_name = None # e.g. "text", "json", "int64" — set by subclasses.
47+
48+
def __init__(self, value=_UNSET):
49+
self._value = value
50+
51+
@property
52+
def value(self):
53+
"""The wrapped Python value, or ``_UNSET`` if this is a pure descriptor."""
54+
return self._value
55+
56+
# -- Public API --------------------------------------------------------
57+
58+
def serialize(self, format=STORAGE):
59+
"""
60+
Serialize the wrapped value. Must be side-effect-free.
61+
62+
Parameters
63+
----------
64+
format : str
65+
``STORAGE`` (default) returns ``(List[SerializedBlob], dict)``.
66+
``WIRE`` returns a ``str``.
67+
"""
68+
if format == WIRE:
69+
return self._wire_serialize()
70+
if format == STORAGE:
71+
return self._storage_serialize()
72+
raise ValueError("format must be %r or %r, got %r" % (STORAGE, WIRE, format))
73+
74+
@classmethod
75+
def deserialize(cls, data, format=STORAGE, **kwargs):
76+
"""
77+
Reconstruct an :class:`IOType` from serialized data.
78+
79+
Parameters
80+
----------
81+
data : Union[str, List[bytes]]
82+
``str`` when ``format=WIRE``; ``List[bytes]`` when ``format=STORAGE``.
83+
format : str
84+
``STORAGE`` (default) or ``WIRE``.
85+
**kwargs
86+
Forwarded to the underlying ``_storage_deserialize`` hook
87+
(e.g. metadata the datastore produced at save time).
88+
"""
89+
if format == WIRE:
90+
return cls._wire_deserialize(data)
91+
if format == STORAGE:
92+
return cls._storage_deserialize(data, **kwargs)
93+
raise ValueError("format must be %r or %r, got %r" % (STORAGE, WIRE, format))
94+
95+
# -- Subclass hooks ----------------------------------------------------
96+
97+
@abstractmethod
98+
def _wire_serialize(self):
99+
"""Value -> string (for CLI args, protobuf, external APIs)."""
100+
raise NotImplementedError
101+
102+
@classmethod
103+
@abstractmethod
104+
def _wire_deserialize(cls, s):
105+
"""String -> :class:`IOType` instance."""
106+
raise NotImplementedError
107+
108+
@abstractmethod
109+
def _storage_serialize(self):
110+
"""Value -> ``(List[SerializedBlob], metadata_dict)``. Side-effect-free."""
111+
raise NotImplementedError
112+
113+
@classmethod
114+
@abstractmethod
115+
def _storage_deserialize(cls, blobs, **kwargs):
116+
"""``(List[bytes], metadata)`` -> :class:`IOType` instance."""
117+
raise NotImplementedError
118+
119+
# -- Spec generation ---------------------------------------------------
120+
121+
def to_spec(self):
122+
"""JSON type spec. Works with or without a wrapped value."""
123+
return {"type": self.type_name}
124+
125+
# -- Dunder ------------------------------------------------------------
126+
127+
def __repr__(self):
128+
if self._value is _UNSET:
129+
return "%s()" % self.__class__.__name__
130+
return "%s(%r)" % (self.__class__.__name__, self._value)
131+
132+
def __eq__(self, other):
133+
if type(self) is not type(other):
134+
return NotImplemented
135+
return self._value == other._value
136+
137+
def __hash__(self):
138+
return hash((type(self), self._value))

metaflow/io_types/json_type.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import json
2+
3+
from ..datastore.artifacts.serializer import SerializedBlob
4+
from .base import IOType
5+
6+
7+
class Json(IOType):
8+
"""JSON type (dict or list). Wire: JSON string. Storage: UTF-8 JSON bytes."""
9+
10+
type_name = "json"
11+
12+
def _wire_serialize(self):
13+
return json.dumps(self._value, separators=(",", ":"), sort_keys=True)
14+
15+
@classmethod
16+
def _wire_deserialize(cls, s):
17+
return cls(json.loads(s))
18+
19+
def _storage_serialize(self):
20+
blob = json.dumps(self._value, separators=(",", ":"), sort_keys=True).encode(
21+
"utf-8"
22+
)
23+
return [SerializedBlob(blob)], {}
24+
25+
@classmethod
26+
def _storage_deserialize(cls, blobs, **kwargs):
27+
return cls(json.loads(blobs[0].decode("utf-8")))

metaflow/io_types/struct_type.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import dataclasses
2+
import json
3+
import typing
4+
5+
from ..datastore.artifacts.serializer import SerializedBlob
6+
from .base import IOType
7+
8+
9+
def _iotype_for_annotation(annotation):
10+
"""
11+
Resolve a Python type annotation to an IOType class. Only IOType
12+
subclasses are recognized; plain Python types return ``None`` and the
13+
caller falls back to a string representation in :meth:`Struct.to_spec`.
14+
"""
15+
if isinstance(annotation, type) and issubclass(annotation, IOType):
16+
return annotation
17+
return None
18+
19+
20+
class Struct(IOType):
21+
"""
22+
Structured type mapping to Python @dataclass.
23+
24+
Wire: JSON string. Storage: JSON UTF-8 bytes.
25+
26+
Wraps a @dataclass instance. Fields are inferred from dataclass annotations
27+
with implicit scalar mapping (str->Text, int->Int64, float->Float64, bool->Bool).
28+
29+
Parameters
30+
----------
31+
value : dataclass instance or dict, optional
32+
The wrapped value. Dataclass instances are serialized via dataclasses.asdict.
33+
Plain dicts are serialized directly as JSON.
34+
dataclass_type : type, optional
35+
The @dataclass class for type descriptor use (no value).
36+
"""
37+
38+
type_name = "struct"
39+
40+
def __init__(self, value=None, dataclass_type=None):
41+
if value is not None and dataclasses.is_dataclass(value):
42+
self._dataclass_type = type(value)
43+
elif dataclass_type is not None:
44+
self._dataclass_type = dataclass_type
45+
else:
46+
self._dataclass_type = None
47+
super().__init__(value)
48+
49+
def _to_dict(self):
50+
"""Convert value to dict, handling both dataclass and plain dict."""
51+
if dataclasses.is_dataclass(self._value):
52+
return dataclasses.asdict(self._value)
53+
if isinstance(self._value, dict):
54+
return self._value
55+
raise TypeError(
56+
"Struct value must be a dataclass instance or dict, got %s"
57+
% type(self._value).__name__
58+
)
59+
60+
def _wire_serialize(self):
61+
return json.dumps(self._to_dict(), separators=(",", ":"), sort_keys=True)
62+
63+
@classmethod
64+
def _wire_deserialize(cls, s):
65+
return cls(json.loads(s))
66+
67+
def _storage_serialize(self):
68+
blob = json.dumps(
69+
self._to_dict(), separators=(",", ":"), sort_keys=True
70+
).encode("utf-8")
71+
meta = {}
72+
if self._dataclass_type is not None:
73+
meta["dataclass_module"] = self._dataclass_type.__module__
74+
meta["dataclass_class"] = self._dataclass_type.__name__
75+
return [SerializedBlob(blob)], meta
76+
77+
@classmethod
78+
def _storage_deserialize(cls, blobs, **kwargs):
79+
data = json.loads(blobs[0].decode("utf-8"))
80+
metadata = kwargs.get("metadata", {})
81+
dc_module = metadata.get("dataclass_module")
82+
dc_class = metadata.get("dataclass_class")
83+
if dc_module and dc_class:
84+
import importlib
85+
86+
mod = importlib.import_module(dc_module)
87+
dc_type = getattr(mod, dc_class)
88+
# Security: only allow actual dataclasses, not arbitrary classes
89+
if not dataclasses.is_dataclass(dc_type):
90+
raise ValueError(
91+
"Struct metadata references '%s.%s' which is not a dataclass"
92+
% (dc_module, dc_class)
93+
)
94+
return cls(dc_type(**data), dataclass_type=dc_type)
95+
# Fallback: return as plain dict wrapped in Struct
96+
return cls(data)
97+
98+
def to_spec(self):
99+
spec = {"type": self.type_name}
100+
if self._dataclass_type is not None and dataclasses.is_dataclass(
101+
self._dataclass_type
102+
):
103+
# Use typing.get_type_hints() to resolve string annotations
104+
# (handles `from __future__ import annotations`)
105+
try:
106+
hints = typing.get_type_hints(self._dataclass_type)
107+
except Exception:
108+
hints = {}
109+
fields = []
110+
for f in dataclasses.fields(self._dataclass_type):
111+
annotation = hints.get(f.name, f.type)
112+
field_iotype = _iotype_for_annotation(annotation)
113+
if field_iotype is not None:
114+
field_spec = field_iotype().to_spec()
115+
else:
116+
field_spec = {"type": str(annotation)}
117+
fields.append({"name": f.name, **field_spec})
118+
spec["fields"] = fields
119+
return spec

metaflow/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@
190190
# Add artifact serializers here. Ordering is by PRIORITY (lower = tried first).
191191
# PickleSerializer is the universal fallback (PRIORITY=9999).
192192
ARTIFACT_SERIALIZERS_DESC = [
193+
("iotype", ".datastores.serializers.iotype_serializer.IOTypeSerializer"),
193194
("pickle", ".datastores.serializers.pickle_serializer.PickleSerializer"),
194195
]
195196

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import importlib
2+
3+
from metaflow.datastore.artifacts.serializer import (
4+
ArtifactSerializer,
5+
SerializationMetadata,
6+
STORAGE,
7+
WIRE,
8+
)
9+
from metaflow.io_types.base import IOType
10+
11+
12+
class IOTypeSerializer(ArtifactSerializer):
13+
"""
14+
Bridge between :class:`IOType` and the pluggable serializer framework.
15+
16+
Claims any :class:`IOType` instance on save. On load, reconstructs the
17+
original subclass from the ``iotype_module`` / ``iotype_class`` hints
18+
that were written into ``serializer_info``.
19+
20+
``PRIORITY`` is 50 — ahead of the default (100) so this bridge catches
21+
:class:`IOType` artifacts before any generic catch-all, and always ahead
22+
of the :class:`PickleSerializer` fallback (9999).
23+
24+
Only the ``STORAGE`` format is implemented on this bridge; ``WIRE`` is
25+
handled by callers that talk to :class:`IOType` directly (CLI parsing,
26+
protobuf payload construction), not through the datastore.
27+
"""
28+
29+
TYPE = "iotype"
30+
PRIORITY = 50
31+
32+
_ENCODING_PREFIX = "iotype:"
33+
34+
@classmethod
35+
def can_serialize(cls, obj):
36+
return isinstance(obj, IOType)
37+
38+
@classmethod
39+
def can_deserialize(cls, metadata):
40+
return metadata.encoding.startswith(cls._ENCODING_PREFIX)
41+
42+
@classmethod
43+
def serialize(cls, obj, format=STORAGE):
44+
if format == WIRE:
45+
raise NotImplementedError(
46+
"IOTypeSerializer only handles the STORAGE format; wire "
47+
"encoding is produced by calling IOType.serialize(format=WIRE) "
48+
"directly."
49+
)
50+
blobs, meta_dict = obj.serialize(format=STORAGE)
51+
size = sum(len(b.value) for b in blobs if isinstance(b.value, bytes))
52+
return (
53+
blobs,
54+
SerializationMetadata(
55+
obj_type=obj.type_name,
56+
size=size,
57+
encoding=cls._ENCODING_PREFIX + obj.type_name,
58+
serializer_info={
59+
"iotype_module": obj.__class__.__module__,
60+
"iotype_class": obj.__class__.__name__,
61+
**meta_dict,
62+
},
63+
),
64+
)
65+
66+
@classmethod
67+
def deserialize(cls, data, metadata=None, context=None, format=STORAGE):
68+
if format == WIRE:
69+
raise NotImplementedError(
70+
"IOTypeSerializer only handles the STORAGE format."
71+
)
72+
info = metadata.serializer_info
73+
mod = importlib.import_module(info["iotype_module"])
74+
iotype_cls = getattr(mod, info["iotype_class"])
75+
# Only allow actual IOType subclasses — metadata is untrusted input.
76+
if not (isinstance(iotype_cls, type) and issubclass(iotype_cls, IOType)):
77+
raise ValueError(
78+
"IOTypeSerializer metadata references '%s.%s' which is not an "
79+
"IOType subclass" % (info["iotype_module"], info["iotype_class"])
80+
)
81+
return iotype_cls.deserialize(data, format=STORAGE, metadata=info)

test/unit/io_types/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)