[Do not Review] AlgoSpec + IOType serialization compatibility test#3119
[Do not Review] AlgoSpec + IOType serialization compatibility test#3119
Conversation
Introduce ArtifactSerializer ABC with priority-based dispatch, enabling custom serializers to be plugged in alongside the default pickle path. This is the foundation for the IOType system which adds typed serialization for Metaflow artifacts. New abstractions: - ArtifactSerializer: base class with can_serialize/can_deserialize/ serialize/deserialize interface - SerializerStore: metaclass for auto-registration and deterministic priority-ordered dispatch - SerializationMetadata: namedtuple for artifact metadata routing - SerializedBlob: supports both new bytes and references to already-stored data - PickleSerializer: universal fallback (PRIORITY=9999) wrapping existing pickle logic No existing code is modified. These are inert until wired into TaskDataStore in a follow-up commit. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Register PickleSerializer through the standard Metaflow plugin
mechanism so extensions (e.g., mli-metaflow-custom) can add their
own serializers via ARTIFACT_SERIALIZERS_DESC.
- Add artifact_serializer category to _plugin_categories
- Add ARTIFACT_SERIALIZERS_DESC with PickleSerializer in plugins/__init__.py
- Resolve via ARTIFACT_SERIALIZERS = resolve_plugins("artifact_serializer")
Importing metaflow.plugins now triggers PickleSerializer registration
in SerializerStore, ensuring the registry is populated before
TaskDataStore needs it (commit 3).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
save_artifacts now loops through priority-ordered serializers to find
one that can_serialize the object. load_artifacts routes deserialization
through metadata.encoding via can_deserialize. PickleSerializer handles
all existing Python objects as the universal fallback.
Behavioral changes:
- New artifacts get encoding "pickle-v4" (was "gzip+pickle-v4")
- _info[name] gains optional "serializer_info" dict for custom serializers
- Removed hardcoded pickle import from task_datastore.py
Backward compatible:
- Old artifacts with "gzip+pickle-v2" or "gzip+pickle-v4" encoding
load correctly (PickleSerializer.can_deserialize handles both)
- Missing encoding defaults to "gzip+pickle-v2"
- Missing serializer_info defaults to {}
- _objects[name] stays as single string (multi-blob deferred to PR 1.5)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- SerializerStore now extends ABCMeta (not type) so @AbstractMethod is enforced — incomplete subclasses raise TypeError at definition - Validate blobs non-empty before accessing blobs[0] in save_artifacts - Add NOTE on compress_method not yet wired into save path Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Two features in one commit: 1. @step(start=True, end=True) kwargs — explicit start/end marking. FlowGraph._identify_start_end resolves endpoints with three-tier fallback: name "start"/"end" (backward compat) → @step(start/end) attributes → None (lint catches it). All hardcoded "start"/"end" strings replaced with graph.start_step/end_step throughout: graph traversal, output_steps, lint, runtime, flowspec _graph_info, task foreach reset, and client API (end_task, parent/child steps, trigger). Metadata registration persists endpoints for cheap client lookups. 2. AlgoSpec(FlowSpec) — single-computation construct. AlgoSpecMeta marks call() as @step(start=True, end=True) with the class name lowercased as the step name. __getattr__ resolves the step name to the call method so getattr(flow, "squaremodel") works. Tested: 24 unit tests (standard flow, custom-named flow, single-step flow, AlgoSpec), local run, client API, Maestro deployment. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
IOType is a dual-purpose base class: type descriptor (for specs) and value wrapper (for typed serialization). Four-operation serde model unified behind format='wire'|'storage' parameter. Types added (storage byte order: little-endian, aligned with AIS V2): - Scalars: Text, Bool, Int32, Int64, Float32, Float64 - Structured: Json, Enum (with allowed-values validation), Struct (@DataClass integration with implicit scalar mapping) - Collections: List, Map (recursive child type delegation) All types support: - Wire serde (string-based, for CLI/protobuf) - Storage serde (blob-based, for S3/disk) - to_spec() for AIS-compatible JSON schema generation Tensor type deferred to next commit (numpy dependency). No existing code modified. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire format: JSON header (dtype + shape) + base64-encoded raw bytes. Storage format: raw bytes blob + shape/dtype metadata dict. numpy is imported lazily — Tensor is importable but raises ImportError with a helpful message if numpy is not installed and serde is attempted. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
IOTypeSerializer connects the IOType system to the pluggable serializer
framework from PR 1. Auto-detects IOType instances via isinstance check
(PRIORITY=50, before PickleSerializer).
On save: IOType.serialize(format='storage') produces blobs; metadata
records iotype_module + iotype_class for reconstruction on load.
On load: metadata.encoding prefix "iotype:" routes to IOTypeSerializer;
original IOType subclass is reconstructed from serializer_info.
With this bridge:
self.data = Json({"key": "value"}) # IOTypeSerializer claims it
self.data = {"key": "value"} # PickleSerializer claims it
Registered in ARTIFACT_SERIALIZERS_DESC before PickleSerializer.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR merges two feature branches: AlgoSpec (single-step flows via a metaclass wrapping
Confidence Score: 3/5Not safe to merge as-is — the _UNSET/None inconsistency in scalar IOTypes is a real round-trip bug that will surface when users call wire serialize/deserialize on uninitialized instances. The P1 finding in io_types/scalars.py (and the same pattern in Enum, List, Map, Struct) means the type-descriptor contract established by IOType base class is broken: calling _wire_serialize() on a default-constructed Int32() produces 'None', which then fails deserialization. This is a correctness defect on the newly introduced code path. The remaining findings are P2 and do not block merge by themselves, but combined with the P1 they keep the score at 3. metaflow/io_types/scalars.py (and the same _UNSET pattern in enum_type.py, collections.py, struct_type.py), metaflow/plugins/datastores/serializers/pickle_serializer.py Important Files Changed
Reviews (1): Last reviewed commit: "Merge branch 'pr-3118' into feature/algo..." | Re-trigger Greptile |
| raise ValueError( | ||
| "Int32 value %d out of range [%d, %d]" % (value, self._MIN, self._MAX) | ||
| ) | ||
| super().__init__(value) | ||
|
|
||
| def _wire_serialize(self): |
There was a problem hiding this comment.
None default breaks _UNSET type-descriptor contract
Int32, Int64, Float32, Float64 (and similarly Enum, List, Map, Struct) use value=None as their default, but IOType.__init__ uses the _UNSET sentinel to distinguish "no value / type descriptor" from "value was explicitly provided." As a result, Int32()._value is _UNSET evaluates to False, which silently breaks any code that checks for descriptor-only instances via _UNSET. Calling Int32()._wire_serialize() also returns "None", and Int32._wire_deserialize("None") raises ValueError: invalid literal for int().
Change value=None → value=_UNSET (importing _UNSET from .base) and guard serialization on self._value is not _UNSET.
| mod = importlib.import_module(dc_module) | ||
| dc_type = getattr(mod, dc_class) | ||
| # Security: only allow actual dataclasses, not arbitrary classes | ||
| if not dataclasses.is_dataclass(dc_type): | ||
| raise ValueError( | ||
| "Struct metadata references '%s.%s' which is not a dataclass" | ||
| % (dc_module, dc_class) | ||
| ) | ||
| return cls(dc_type(**data), dataclass_type=dc_type) | ||
| # Fallback: return as plain dict wrapped in Struct | ||
| return cls(data) | ||
|
|
||
| def to_spec(self): | ||
| spec = {"type": self.type_name} | ||
| if self._dataclass_type is not None and dataclasses.is_dataclass( | ||
| self._dataclass_type | ||
| ): | ||
| # Use typing.get_type_hints() to resolve string annotations |
There was a problem hiding this comment.
Unhandled
ImportError/AttributeError on deserialization
importlib.import_module(dc_module) raises ModuleNotFoundError and getattr(mod, dc_class) raises AttributeError if the stored class has been moved or renamed since serialization. Both propagate as bare exceptions with no context about what artifact failed or what module was expected. dc_type(**data) can similarly raise TypeError for mismatched fields. Wrapping these in a descriptive ValueError would make debugging much easier.
try:
mod = importlib.import_module(dc_module)
dc_type = getattr(mod, dc_class)
except (ImportError, AttributeError) as e:
raise ValueError(
"Cannot reconstruct Struct: could not import '%s.%s'. "
"Has the class been moved or renamed? (%s)" % (dc_module, dc_class, e)
)| Returns (start_step_name, end_step_name) from _parameters metadata. | ||
| Falls back to ("start", "end") for runs that predate this change. | ||
| """ | ||
| if self._cached_endpoints is None: | ||
| start, end = "start", "end" | ||
| try: | ||
| params_meta = self["_parameters"].task.metadata_dict | ||
| start = params_meta.get("start_step", "start") | ||
| end = params_meta.get("end_step", "end") | ||
| except Exception: | ||
| pass | ||
| self._cached_endpoints = (start, end) |
There was a problem hiding this comment.
Bare
except Exception silently swallows real failures
The except Exception: pass block treats every failure — including AttributeError, missing _parameters task, network errors, and data corruption — as "just an old run" and falls back to ("start", "end"). This means a corrupted metadata store or a genuine programming mistake silently produces incorrect graph endpoints rather than a visible error.
Consider at minimum logging a warning on the exception, or narrowing the except to KeyError for the missing-step case.
except KeyError:
pass # Pre-AlgoSpec run: no start_step/end_step metadata| encoding = "pickle-v4" | ||
| return ( | ||
| [SerializedBlob(blob, is_reference=False, compress_method="gzip")], | ||
| SerializationMetadata( | ||
| type=str(type(obj)), | ||
| size=len(blob), | ||
| encoding=encoding, | ||
| serializer_info={}, | ||
| ), | ||
| ) | ||
|
|
||
| @classmethod | ||
| def deserialize(cls, blobs, metadata, context): |
There was a problem hiding this comment.
Encoding label
"pickle-v4" does not reflect actual gzip storage
The previous implementation stored "gzip+pickle-v4" as the encoding to reflect that the ContentAddressedStore (CAS) applies gzip compression. This new code uses "pickle-v4", even though the CAS still always applies gzip (compress_method="gzip" is documented as "not yet wired into the save path"). The metadata therefore claims the blob is raw pickle when it is actually gzip-compressed pickle — any out-of-band tooling or future code that reads the encoding field to determine the storage format will be incorrect.
Consider keeping "gzip+pickle-v4" until compress_method is actually wired up so the encoding metadata accurately describes the stored bytes.
| def deserialize(cls, blobs, metadata, context): | ||
| info = metadata.serializer_info | ||
| mod = importlib.import_module(info["iotype_module"]) | ||
| iotype_cls = getattr(mod, info["iotype_class"]) | ||
| # Security: only allow actual IOType subclasses, not arbitrary classes | ||
| if not (isinstance(iotype_cls, type) and issubclass(iotype_cls, IOType)): | ||
| raise ValueError( | ||
| "IOTypeSerializer metadata references '%s.%s' which is not an " | ||
| "IOType subclass" % (info["iotype_module"], info["iotype_class"]) | ||
| ) | ||
| return iotype_cls.deserialize(blobs, format="storage", metadata=info) |
There was a problem hiding this comment.
Missing key guard before accessing
serializer_info fields
info["iotype_module"] and info["iotype_class"] will raise an unhandled KeyError if serializer_info was persisted without these keys (e.g., a partially written artifact or a future schema change). A brief guard or descriptive error here would make debugging easier:
info = metadata.serializer_info or {}
try:
iotype_module = info["iotype_module"]
iotype_class = info["iotype_class"]
except KeyError as e:
raise ValueError(
"IOTypeSerializer: missing required key %s in serializer_info. "
"Artifact metadata may be corrupt." % e
)|
|
||
| # For now, serializers produce a single blob per artifact. | ||
| # Multi-blob support will be added when IOType lands. | ||
| if not blobs: | ||
| raise DataException( |
There was a problem hiding this comment.
Stale TODO comment — IOType has landed in this PR
The comment says "Multi-blob support will be added when IOType lands," but IOType is landing in this PR. The blobs[0].value truncation is now permanent until explicitly revisited. This comment should be updated to reflect the actual constraint (e.g. "Only the first blob is used; full multi-blob support is tracked as a follow-up.") so future maintainers understand it is a known limitation, not an in-flight TODO.
Summary
Merges #3114 (AlgoSpec) with #3118 (IOType serialization) to verify compatibility.
Both feature sets work together — no conflicts, no regressions:
Test plan
🤖 Generated with Claude Code