Skip to content

Commit 513e64d

Browse files
JustinASmithclaude
andcommitted
source-shopify-native: migrate legacy state before Pydantic validation
Add ShopifyOpen custom request class with model_validator that transforms legacy flat state to dict-based format BEFORE Pydantic parsing. This prevents hybrid state from being created when Flow's checkpoint merging combines old flat state with new dict-based state. Previously, state migration happened at runtime via checkpoint, and Flow's merge behavior would create hybrid state like: {"inc": {"cursor": "...", "store-id": {"cursor": "..."}}} which Pydantic cannot parse. Now the migration happens in the model_validator, so the state is already dict-based when Pydantic parses it, and no format-changing checkpoint is ever created. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 952dd62 commit 513e64d

File tree

3 files changed

+160
-48
lines changed

3 files changed

+160
-48
lines changed

source-shopify-native/source_shopify_native/__init__.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
from logging import Logger
2-
from typing import Callable, Awaitable
2+
from typing import Callable, Awaitable, Union
33

44
from estuary_cdk.flow import (
55
ConnectorSpec,
66
ValidationError,
77
)
88
from estuary_cdk.capture import (
99
BaseCaptureConnector,
10-
Request,
1110
Task,
1211
common,
1312
request,
@@ -20,9 +19,21 @@
2019
ConnectorState,
2120
EndpointConfig,
2221
OAUTH2_SPEC,
22+
ShopifyOpen,
2323
)
2424

2525

26+
# Custom Request type that uses ShopifyOpen for cleaning hybrid state during parsing.
27+
# This is necessary because Flow's checkpoint merging can create hybrid state when
28+
# migrating from flat to dict-based format, which Pydantic cannot parse.
29+
ShopifyRequest = Union[
30+
request.Spec,
31+
request.Discover[EndpointConfig],
32+
request.Validate[EndpointConfig, ResourceConfig],
33+
ShopifyOpen,
34+
]
35+
36+
2637
def _bindings_use_multi_store_keys(bindings: list | None) -> bool:
2738
"""Check if any binding uses the multi-store key structure (/_meta/store in key)."""
2839
if not bindings:
@@ -159,7 +170,7 @@ class Connector(
159170
BaseCaptureConnector[EndpointConfig, ResourceConfig, ConnectorState],
160171
):
161172
def request_class(self):
162-
return Request[EndpointConfig, ResourceConfig, ConnectorState]
173+
return ShopifyRequest
163174

164175
async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
165176
return ConnectorSpec(

source-shopify-native/source_shopify_native/models.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
BaseDocument,
2020
LongLivedClientCredentialsOAuth2Credentials,
2121
OAuth2Spec,
22+
ResourceConfig,
2223
ResourceState,
2324
)
2425
from estuary_cdk.capture.common import (
2526
ConnectorState as GenericConnectorState,
2627
)
28+
from estuary_cdk.capture import request
2729
from .graphql.common import dt_to_str, str_to_dt
2830

2931

@@ -552,3 +554,120 @@ def get_page_info(self) -> PageInfo:
552554
)
553555

554556
return DataModel
557+
558+
559+
def _is_legacy_flat_state(resource_state: dict[str, Any]) -> bool:
560+
"""Check if a resource state is in legacy flat format.
561+
562+
Legacy flat format has cursor/next_page directly in inc/backfill,
563+
not nested under a store_id key.
564+
"""
565+
inc = resource_state.get("inc")
566+
if isinstance(inc, dict):
567+
# Legacy flat: {"cursor": "..."} with no nested dicts containing cursor
568+
if "cursor" in inc:
569+
has_store_keys = any(
570+
isinstance(v, dict) and "cursor" in v
571+
for k, v in inc.items() if k != "cursor"
572+
)
573+
if not has_store_keys:
574+
return True
575+
return False
576+
577+
578+
def _migrate_resource_state_to_dict(resource_state: dict[str, Any], store_id: str) -> None:
579+
"""Migrate a resource state from legacy flat format to dict-based format.
580+
581+
Legacy format: {"inc": {"cursor": "..."}, "backfill": {"next_page": "...", "cutoff": "..."}}
582+
Dict format: {"inc": {"store_id": {"cursor": "..."}}, "backfill": {"store_id": {...}}}
583+
584+
This transforms the state IN PLACE before Pydantic validation, preventing
585+
hybrid state from ever being created via checkpoint merging.
586+
587+
Args:
588+
resource_state: The raw resource state dict to migrate (modified in place).
589+
store_id: The store ID to use as the dict key.
590+
"""
591+
inc = resource_state.get("inc")
592+
if isinstance(inc, dict) and "cursor" in inc:
593+
# Check this is pure legacy (no store keys already present)
594+
has_store_keys = any(
595+
isinstance(v, dict) and "cursor" in v
596+
for k, v in inc.items() if k != "cursor"
597+
)
598+
if not has_store_keys:
599+
# Pure legacy - wrap in store_id dict
600+
resource_state["inc"] = {store_id: inc}
601+
602+
backfill = resource_state.get("backfill")
603+
if isinstance(backfill, dict) and "next_page" in backfill:
604+
# Check this is pure legacy (no store keys already present)
605+
has_store_keys = any(
606+
isinstance(v, dict) and "next_page" in v
607+
for k, v in backfill.items() if k not in ("next_page", "cutoff")
608+
)
609+
if not has_store_keys:
610+
# Pure legacy - wrap in store_id dict
611+
resource_state["backfill"] = {store_id: backfill}
612+
613+
614+
def _get_store_id_from_config(config_data: dict[str, Any]) -> str | None:
615+
"""Extract the store ID from config data for state migration.
616+
617+
Legacy captures have exactly one store, so we use that store's ID
618+
as the key when migrating from flat to dict-based state.
619+
"""
620+
# New multi-store format: {"stores": [{"store": "store-id", ...}]}
621+
stores = config_data.get("stores", [])
622+
if stores and isinstance(stores, list) and len(stores) > 0:
623+
first_store = stores[0]
624+
if isinstance(first_store, dict):
625+
return first_store.get("store")
626+
627+
# Legacy single-store format: {"store": "store-id", ...}
628+
return config_data.get("store")
629+
630+
631+
class ShopifyOpen(request.Open["EndpointConfig", ResourceConfig, ConnectorState]):
632+
"""Custom Open request that migrates legacy state before Pydantic validation.
633+
634+
This transforms legacy flat state to dict-based format BEFORE Pydantic parsing,
635+
which prevents hybrid state from ever being created. Without this, the migration
636+
would happen at runtime via checkpoint, and Flow's checkpoint merging would
637+
create hybrid state that Pydantic cannot parse.
638+
"""
639+
640+
@model_validator(mode="before")
641+
@classmethod
642+
def migrate_legacy_state(cls, data: Any) -> Any:
643+
"""Migrate legacy flat state to dict-based format before Pydantic validation."""
644+
if not isinstance(data, dict):
645+
return data
646+
647+
# Get store_id from config
648+
capture = data.get("capture", {})
649+
if not isinstance(capture, dict):
650+
return data
651+
652+
config_data = capture.get("config", {})
653+
if not isinstance(config_data, dict):
654+
return data
655+
656+
store_id = _get_store_id_from_config(config_data)
657+
if not store_id:
658+
return data
659+
660+
# Migrate each binding's state
661+
state = data.get("state", {})
662+
if not isinstance(state, dict):
663+
return data
664+
665+
binding_states = state.get("bindingStateV1", {})
666+
if not isinstance(binding_states, dict):
667+
return data
668+
669+
for resource_state in binding_states.values():
670+
if isinstance(resource_state, dict):
671+
_migrate_resource_state_to_dict(resource_state, store_id)
672+
673+
return data

source-shopify-native/source_shopify_native/resources.py

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -161,62 +161,44 @@ def _reconcile_connector_state(
161161
) -> None:
162162
"""Reconcile connector state to ensure all stores have proper state entries.
163163
164-
This handles:
165-
1. Legacy flat state migration to dict-based state
166-
2. Adding new stores to existing dict-based state
164+
This handles adding new stores to existing dict-based state. Legacy flat state
165+
migration is handled earlier in ShopifyOpen.migrate_legacy_state() before
166+
Pydantic validation, which prevents hybrid state from being created.
167167
168168
Args:
169169
store_ids: List of store IDs that should have state entries.
170170
binding: The capture binding being processed.
171-
state: The current state (may be flat or dict-based).
171+
state: The current state (always dict-based after ShopifyOpen migration).
172172
initial_state: The initial state template for new entries.
173173
task: The task for logging and checkpointing.
174174
"""
175-
should_checkpoint = False
176-
177-
# Handle legacy flat state migration
178-
if isinstance(state.inc, ResourceState.Incremental):
179-
# Legacy flat state - migrate to dict-based
180-
if len(store_ids) != 1:
181-
raise RuntimeError(
182-
f"Cannot migrate legacy flat state with multiple stores ({len(store_ids)}). "
183-
"Legacy captures should only have one store."
184-
)
185-
store_id = store_ids[0]
186-
task.log.info(f"Migrating legacy flat state to dict-based for store: {store_id}")
187-
188-
legacy_inc = state.inc
189-
state.inc = {store_id: legacy_inc} # type: ignore[assignment]
175+
# State should always be dict-based at this point (migration happens in ShopifyOpen)
176+
if not isinstance(state.inc, dict) or not isinstance(initial_state.inc, dict):
177+
return
190178

191-
if state.backfill is not None and isinstance(state.backfill, ResourceState.Backfill):
192-
legacy_backfill = state.backfill
193-
state.backfill = {store_id: legacy_backfill} # type: ignore[assignment]
179+
should_checkpoint = False
194180

195-
should_checkpoint = True
181+
for store_id in store_ids:
182+
inc_state_exists = store_id in state.inc
183+
backfill_state_exists = (
184+
isinstance(state.backfill, dict) and store_id in state.backfill
185+
)
196186

197-
# Handle adding new stores to existing dict-based state
198-
elif isinstance(state.inc, dict) and isinstance(initial_state.inc, dict):
199-
for store_id in store_ids:
200-
inc_state_exists = store_id in state.inc
201-
backfill_state_exists = (
202-
isinstance(state.backfill, dict) and store_id in state.backfill
187+
if not inc_state_exists and not backfill_state_exists:
188+
task.log.info(f"Initializing new state for store: {store_id}")
189+
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
190+
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
191+
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
192+
should_checkpoint = True
193+
elif not inc_state_exists and backfill_state_exists:
194+
# Edge case: backfill exists but incremental doesn't
195+
task.log.info(
196+
f"Reinitializing state for store {store_id} due to missing incremental state."
203197
)
204-
205-
if not inc_state_exists and not backfill_state_exists:
206-
task.log.info(f"Initializing new state for store: {store_id}")
207-
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
208-
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
209-
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
210-
should_checkpoint = True
211-
elif not inc_state_exists and backfill_state_exists:
212-
# Edge case: backfill exists but incremental doesn't
213-
task.log.info(
214-
f"Reinitializing state for store {store_id} due to missing incremental state."
215-
)
216-
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
217-
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
218-
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
219-
should_checkpoint = True
198+
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
199+
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
200+
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
201+
should_checkpoint = True
220202

221203
if should_checkpoint:
222204
task.log.info(f"Checkpointing reconciled state for {binding.stateKey}.")

0 commit comments

Comments
 (0)