Skip to content

Commit 48fca48

Browse files
JustinASmithclaude
andcommitted
source-shopify-native: simplify state handling to follow stripe pattern
Remove complex state migration logic (ShopifyRequest, ShopifyOpen) and follow the pattern from source-stripe-native instead: - New captures: always use dict-based state {"inc": {"store_id": {...}}} - Legacy single-store captures: continue with flat state {"inc": {"cursor": ...}} - Transitioning to multiple stores: requires backfill (already enforced) This avoids hybrid state issues by never changing the state format at runtime. Legacy captures keep their flat state, new captures start with dict-based state. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 513e64d commit 48fca48

File tree

3 files changed

+82
-175
lines changed

3 files changed

+82
-175
lines changed

source-shopify-native/source_shopify_native/__init__.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from logging import Logger
2-
from typing import Callable, Awaitable, Union
2+
from typing import Callable, Awaitable
33

44
from estuary_cdk.flow import (
55
ConnectorSpec,
@@ -14,26 +14,16 @@
1414
)
1515
from estuary_cdk.capture.common import ResourceConfig
1616

17+
from estuary_cdk.capture import Request
18+
1719
from .resources import all_resources, validate_credentials
1820
from .models import (
1921
ConnectorState,
2022
EndpointConfig,
2123
OAUTH2_SPEC,
22-
ShopifyOpen,
2324
)
2425

2526

26-
# Custom Request type that uses ShopifyOpen for cleaning hybrid state during parsing.
27-
# This is necessary because Flow's checkpoint merging can create hybrid state when
28-
# migrating from flat to dict-based format, which Pydantic cannot parse.
29-
ShopifyRequest = Union[
30-
request.Spec,
31-
request.Discover[EndpointConfig],
32-
request.Validate[EndpointConfig, ResourceConfig],
33-
ShopifyOpen,
34-
]
35-
36-
3727
def _bindings_use_multi_store_keys(bindings: list | None) -> bool:
3828
"""Check if any binding uses the multi-store key structure (/_meta/store in key)."""
3929
if not bindings:
@@ -170,7 +160,7 @@ class Connector(
170160
BaseCaptureConnector[EndpointConfig, ResourceConfig, ConnectorState],
171161
):
172162
def request_class(self):
173-
return ShopifyRequest
163+
return Request[EndpointConfig, ResourceConfig, ConnectorState]
174164

175165
async def spec(self, log: Logger, _: request.Spec) -> ConnectorSpec:
176166
return ConnectorSpec(

source-shopify-native/source_shopify_native/models.py

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -556,118 +556,5 @@ def get_page_info(self) -> PageInfo:
556556
return DataModel
557557

558558

559-
def _is_legacy_flat_state(resource_state: dict[str, Any]) -> bool:
560-
"""Check if a resource state is in legacy flat format.
561-
562-
Legacy flat format has cursor/next_page directly in inc/backfill,
563-
not nested under a store_id key.
564-
"""
565-
inc = resource_state.get("inc")
566-
if isinstance(inc, dict):
567-
# Legacy flat: {"cursor": "..."} with no nested dicts containing cursor
568-
if "cursor" in inc:
569-
has_store_keys = any(
570-
isinstance(v, dict) and "cursor" in v
571-
for k, v in inc.items() if k != "cursor"
572-
)
573-
if not has_store_keys:
574-
return True
575-
return False
576-
577-
578-
def _migrate_resource_state_to_dict(resource_state: dict[str, Any], store_id: str) -> None:
579-
"""Migrate a resource state from legacy flat format to dict-based format.
580-
581-
Legacy format: {"inc": {"cursor": "..."}, "backfill": {"next_page": "...", "cutoff": "..."}}
582-
Dict format: {"inc": {"store_id": {"cursor": "..."}}, "backfill": {"store_id": {...}}}
583-
584-
This transforms the state IN PLACE before Pydantic validation, preventing
585-
hybrid state from ever being created via checkpoint merging.
586-
587-
Args:
588-
resource_state: The raw resource state dict to migrate (modified in place).
589-
store_id: The store ID to use as the dict key.
590-
"""
591-
inc = resource_state.get("inc")
592-
if isinstance(inc, dict) and "cursor" in inc:
593-
# Check this is pure legacy (no store keys already present)
594-
has_store_keys = any(
595-
isinstance(v, dict) and "cursor" in v
596-
for k, v in inc.items() if k != "cursor"
597-
)
598-
if not has_store_keys:
599-
# Pure legacy - wrap in store_id dict
600-
resource_state["inc"] = {store_id: inc}
601-
602-
backfill = resource_state.get("backfill")
603-
if isinstance(backfill, dict) and "next_page" in backfill:
604-
# Check this is pure legacy (no store keys already present)
605-
has_store_keys = any(
606-
isinstance(v, dict) and "next_page" in v
607-
for k, v in backfill.items() if k not in ("next_page", "cutoff")
608-
)
609-
if not has_store_keys:
610-
# Pure legacy - wrap in store_id dict
611-
resource_state["backfill"] = {store_id: backfill}
612-
613-
614-
def _get_store_id_from_config(config_data: dict[str, Any]) -> str | None:
615-
"""Extract the store ID from config data for state migration.
616-
617-
Legacy captures have exactly one store, so we use that store's ID
618-
as the key when migrating from flat to dict-based state.
619-
"""
620-
# New multi-store format: {"stores": [{"store": "store-id", ...}]}
621-
stores = config_data.get("stores", [])
622-
if stores and isinstance(stores, list) and len(stores) > 0:
623-
first_store = stores[0]
624-
if isinstance(first_store, dict):
625-
return first_store.get("store")
626-
627-
# Legacy single-store format: {"store": "store-id", ...}
628-
return config_data.get("store")
629-
630559

631-
class ShopifyOpen(request.Open["EndpointConfig", ResourceConfig, ConnectorState]):
632-
"""Custom Open request that migrates legacy state before Pydantic validation.
633-
634-
This transforms legacy flat state to dict-based format BEFORE Pydantic parsing,
635-
which prevents hybrid state from ever being created. Without this, the migration
636-
would happen at runtime via checkpoint, and Flow's checkpoint merging would
637-
create hybrid state that Pydantic cannot parse.
638-
"""
639-
640-
@model_validator(mode="before")
641-
@classmethod
642-
def migrate_legacy_state(cls, data: Any) -> Any:
643-
"""Migrate legacy flat state to dict-based format before Pydantic validation."""
644-
if not isinstance(data, dict):
645-
return data
646560

647-
# Get store_id from config
648-
capture = data.get("capture", {})
649-
if not isinstance(capture, dict):
650-
return data
651-
652-
config_data = capture.get("config", {})
653-
if not isinstance(config_data, dict):
654-
return data
655-
656-
store_id = _get_store_id_from_config(config_data)
657-
if not store_id:
658-
return data
659-
660-
# Migrate each binding's state
661-
state = data.get("state", {})
662-
if not isinstance(state, dict):
663-
return data
664-
665-
binding_states = state.get("bindingStateV1", {})
666-
if not isinstance(binding_states, dict):
667-
return data
668-
669-
for resource_state in binding_states.values():
670-
if isinstance(resource_state, dict):
671-
_migrate_resource_state_to_dict(resource_state, store_id)
672-
673-
return data

source-shopify-native/source_shopify_native/resources.py

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,39 @@ def _create_initial_state(
138138
store_ids: list[str],
139139
start_date: AwareDatetime,
140140
use_backfill: bool,
141+
use_multi_store_keys: bool,
141142
) -> ResourceState:
142-
"""Create initial state for a resource. State is always dict-based, keyed by store_id."""
143+
"""Create initial state for a resource.
144+
145+
State format depends on use_multi_store_keys:
146+
- False (1 store): Flat state format {"inc": {"cursor": "..."}}
147+
- True (multiple stores): Dict-based {"inc": {"store_id": {"cursor": "..."}}}
148+
149+
This follows the same pattern as source-stripe-native: flat state for single account,
150+
dict-based for multiple accounts. Transitioning requires backfill.
151+
"""
143152
cutoff = datetime.now(tz=UTC)
144153

145-
if use_backfill:
154+
if use_multi_store_keys:
155+
# Dict-based state for multiple stores
156+
if use_backfill:
157+
return ResourceState(
158+
inc={sid: ResourceState.Incremental(cursor=cutoff) for sid in store_ids}, # type: ignore[arg-type]
159+
backfill={sid: ResourceState.Backfill(next_page=dt_to_str(start_date), cutoff=cutoff) for sid in store_ids}, # type: ignore[arg-type]
160+
)
146161
return ResourceState(
147-
inc={sid: ResourceState.Incremental(cursor=cutoff) for sid in store_ids}, # type: ignore[arg-type]
148-
backfill={sid: ResourceState.Backfill(next_page=dt_to_str(start_date), cutoff=cutoff) for sid in store_ids}, # type: ignore[arg-type]
162+
inc={sid: ResourceState.Incremental(cursor=start_date) for sid in store_ids}, # type: ignore[arg-type]
163+
)
164+
else:
165+
# Flat state for single store (backward compatible with legacy)
166+
if use_backfill:
167+
return ResourceState(
168+
inc=ResourceState.Incremental(cursor=cutoff),
169+
backfill=ResourceState.Backfill(next_page=dt_to_str(start_date), cutoff=cutoff),
170+
)
171+
return ResourceState(
172+
inc=ResourceState.Incremental(cursor=start_date),
149173
)
150-
return ResourceState(
151-
inc={sid: ResourceState.Incremental(cursor=start_date) for sid in store_ids}, # type: ignore[arg-type]
152-
)
153174

154175

155176
def _reconcile_connector_state(
@@ -161,48 +182,53 @@ def _reconcile_connector_state(
161182
) -> None:
162183
"""Reconcile connector state to ensure all stores have proper state entries.
163184
164-
This handles adding new stores to existing dict-based state. Legacy flat state
165-
migration is handled earlier in ShopifyOpen.migrate_legacy_state() before
166-
Pydantic validation, which prevents hybrid state from being created.
185+
This follows the pattern from source-stripe-native: only add new stores to
186+
existing dict-based state. Flat state (single store) is left unchanged.
187+
188+
State format is determined by _create_initial_state based on use_multi_store_keys:
189+
- Single store: flat state, no reconciliation needed
190+
- Multiple stores: dict-based state, add new stores as needed
167191
168192
Args:
169193
store_ids: List of store IDs that should have state entries.
170194
binding: The capture binding being processed.
171-
state: The current state (always dict-based after ShopifyOpen migration).
195+
state: The current state.
172196
initial_state: The initial state template for new entries.
173197
task: The task for logging and checkpointing.
174198
"""
175-
# State should always be dict-based at this point (migration happens in ShopifyOpen)
176-
if not isinstance(state.inc, dict) or not isinstance(initial_state.inc, dict):
177-
return
178-
179-
should_checkpoint = False
180-
181-
for store_id in store_ids:
182-
inc_state_exists = store_id in state.inc
183-
backfill_state_exists = (
184-
isinstance(state.backfill, dict) and store_id in state.backfill
185-
)
186-
187-
if not inc_state_exists and not backfill_state_exists:
188-
task.log.info(f"Initializing new state for store: {store_id}")
189-
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
190-
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
191-
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
192-
should_checkpoint = True
193-
elif not inc_state_exists and backfill_state_exists:
194-
# Edge case: backfill exists but incremental doesn't
195-
task.log.info(
196-
f"Reinitializing state for store {store_id} due to missing incremental state."
199+
# Only reconcile dict-based state (multiple stores)
200+
# Flat state (single store) doesn't need reconciliation
201+
if (
202+
isinstance(state.inc, dict)
203+
and isinstance(initial_state.inc, dict)
204+
):
205+
should_checkpoint = False
206+
207+
for store_id in store_ids:
208+
inc_state_exists = store_id in state.inc
209+
backfill_state_exists = (
210+
isinstance(state.backfill, dict) and store_id in state.backfill
197211
)
198-
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
199-
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
200-
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
201-
should_checkpoint = True
202212

203-
if should_checkpoint:
204-
task.log.info(f"Checkpointing reconciled state for {binding.stateKey}.")
205-
task.checkpoint(ConnectorState(bindingStateV1={binding.stateKey: state}))
213+
if not inc_state_exists and not backfill_state_exists:
214+
task.log.info(f"Initializing new state for store: {store_id}")
215+
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
216+
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
217+
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
218+
should_checkpoint = True
219+
elif not inc_state_exists and backfill_state_exists:
220+
# Edge case: backfill exists but incremental doesn't
221+
task.log.info(
222+
f"Reinitializing state for store {store_id} due to missing incremental state."
223+
)
224+
state.inc[store_id] = deepcopy(initial_state.inc[store_id])
225+
if isinstance(state.backfill, dict) and isinstance(initial_state.backfill, dict):
226+
state.backfill[store_id] = deepcopy(initial_state.backfill[store_id])
227+
should_checkpoint = True
228+
229+
if should_checkpoint:
230+
task.log.info(f"Checkpointing reconciled state for {binding.stateKey}.")
231+
task.checkpoint(ConnectorState(bindingStateV1={binding.stateKey: state}))
206232

207233

208234
async def _check_plan_allows_pii(http: HTTPMixin, url: str, log: Logger) -> bool:
@@ -277,15 +303,19 @@ async def all_resources(
277303
) -> list[Resource]:
278304
"""Discover all available resources across all configured stores.
279305
280-
State is always dict-based internally, keyed by store_id.
281-
Collection keys depend on use_multi_store_keys:
282-
- True: ["/_meta/store", "/id"] (new captures and multi-store)
283-
- False: ["/id"] (legacy single-store captures for backward compatibility)
306+
State and collection key format depends on use_multi_store_keys:
307+
- True: Dict-based state {"inc": {"store_id": {...}}}, keys ["/_meta/store", "/id"]
308+
- False: Flat state {"inc": {"cursor": "..."}}, keys ["/id"]
309+
310+
This follows the pattern from source-stripe-native: new captures always use
311+
dict-based state, legacy single-store captures continue with flat state,
312+
and transitioning requires backfill.
284313
285314
Args:
286-
use_multi_store_keys: Whether to include /_meta/store in collection keys.
287-
New captures should always pass True. Only pass False for legacy
288-
single-store captures that haven't added additional stores.
315+
use_multi_store_keys: Whether to use multi-store format.
316+
True: New captures, existing multi-store captures, or legacy captures
317+
transitioning to multiple stores (after backfill acknowledgment).
318+
False: Legacy single-store captures (backward compatibility).
289319
"""
290320
# Build store contexts
291321
store_contexts: dict[str, dict] = {}
@@ -330,7 +360,7 @@ async def all_resources(
330360
continue
331361

332362
use_backfill = not model.SHOULD_USE_BULK_QUERIES and model.SORT_KEY is not None
333-
initial_state = _create_initial_state(stores_with_access, config.start_date, use_backfill)
363+
initial_state = _create_initial_state(stores_with_access, config.start_date, use_backfill, use_multi_store_keys)
334364

335365
def create_open_fn(
336366
model: type[ShopifyGraphQLResource],

0 commit comments

Comments
 (0)