11import functools
2+ from copy import deepcopy
23from datetime import datetime , timedelta , UTC
34from logging import Logger
45from pydantic import AwareDatetime
1920
2021from .models import (
2122 OAUTH2_SPEC ,
22- ConnectorState ,
2323 AccessScopes ,
24+ ConnectorState ,
2425 EndpointConfig ,
2526 ShopifyGraphQLResource ,
2627 ShopDetails ,
@@ -151,41 +152,75 @@ def _create_initial_state(
151152 )
152153
153154
154- def _migrate_legacy_state (
155+ def _reconcile_connector_state (
156+ store_ids : list [str ],
157+ binding : CaptureBinding [ResourceConfig ],
155158 state : ResourceState ,
156- store_id : str ,
157159 initial_state : ResourceState ,
158160 task : Task ,
159- binding : CaptureBinding [ResourceConfig ],
160161) -> None :
161- """Migrate legacy flat state to dict-based state if needed.
162+ """Reconcile connector state to ensure all stores have proper state entries.
163+
164+ This handles:
165+ 1. Legacy flat state migration to dict-based state
166+ 2. Adding new stores to existing dict-based state
162167
163- Legacy state has ResourceState.Incremental directly in state.inc.
164- New state has {store_id: ResourceState.Incremental} in state.inc.
168+ Args:
169+ store_ids: List of store IDs that should have state entries.
170+ binding: The capture binding being processed.
171+ state: The current state (may be flat or dict-based).
172+ initial_state: The initial state template for new entries.
173+ task: The task for logging and checkpointing.
165174 """
166- # Check if state is already dict-based
167- if isinstance (state .inc , dict ):
168- # Already migrated, but check if this store needs to be added
169- if store_id not in state .inc :
170- task .log .info (f"Adding state for new store: { store_id } " )
171- assert isinstance (initial_state .inc , dict )
172- state .inc [store_id ] = initial_state .inc [store_id ]
173- if isinstance (state .backfill , dict ) and isinstance (initial_state .backfill , dict ):
174- state .backfill [store_id ] = initial_state .backfill [store_id ]
175- task .checkpoint (ConnectorState (bindingStateV1 = {binding .stateKey : state }))
176- return
175+ should_checkpoint = False
176+
177+ # Handle legacy flat state migration
178+ if isinstance (state .inc , ResourceState .Incremental ):
179+ # Legacy flat state - migrate to dict-based
180+ if len (store_ids ) != 1 :
181+ raise RuntimeError (
182+ f"Cannot migrate legacy flat state with multiple stores ({ len (store_ids )} ). "
183+ "Legacy captures should only have one store."
184+ )
185+ store_id = store_ids [0 ]
186+ task .log .info (f"Migrating legacy flat state to dict-based for store: { store_id } " )
177187
178- # Migrate legacy flat state to dict-based
179- task . log . info ( f"Migrating legacy state to dict-based for store: { store_id } " )
188+ legacy_inc = state . inc
189+ state . inc = {store_id : legacy_inc } # type: ignore[assignment]
180190
181- legacy_inc = state .inc
182- state .inc = {store_id : legacy_inc } # type: ignore[assignment]
191+ if state .backfill is not None and isinstance (state .backfill , ResourceState .Backfill ):
192+ legacy_backfill = state .backfill
193+ state .backfill = {store_id : legacy_backfill } # type: ignore[assignment]
183194
184- if state .backfill is not None :
185- legacy_backfill = state .backfill
186- state .backfill = {store_id : legacy_backfill } # type: ignore[assignment,dict-item]
195+ should_checkpoint = True
187196
188- task .checkpoint (ConnectorState (bindingStateV1 = {binding .stateKey : state }))
197+ # Handle adding new stores to existing dict-based state
198+ elif isinstance (state .inc , dict ) and isinstance (initial_state .inc , dict ):
199+ for store_id in store_ids :
200+ inc_state_exists = store_id in state .inc
201+ backfill_state_exists = (
202+ isinstance (state .backfill , dict ) and store_id in state .backfill
203+ )
204+
205+ if not inc_state_exists and not backfill_state_exists :
206+ task .log .info (f"Initializing new state for store: { store_id } " )
207+ state .inc [store_id ] = deepcopy (initial_state .inc [store_id ])
208+ if isinstance (state .backfill , dict ) and isinstance (initial_state .backfill , dict ):
209+ state .backfill [store_id ] = deepcopy (initial_state .backfill [store_id ])
210+ should_checkpoint = True
211+ elif not inc_state_exists and backfill_state_exists :
212+ # Edge case: backfill exists but incremental doesn't
213+ task .log .info (
214+ f"Reinitializing state for store { store_id } due to missing incremental state."
215+ )
216+ state .inc [store_id ] = deepcopy (initial_state .inc [store_id ])
217+ if isinstance (state .backfill , dict ) and isinstance (initial_state .backfill , dict ):
218+ state .backfill [store_id ] = deepcopy (initial_state .backfill [store_id ])
219+ should_checkpoint = True
220+
221+ if should_checkpoint :
222+ task .log .info (f"Checkpointing reconciled state for { binding .stateKey } ." )
223+ task .checkpoint (ConnectorState (bindingStateV1 = {binding .stateKey : state }))
189224
190225
191226async def _check_plan_allows_pii (http : HTTPMixin , url : str , log : Logger ) -> bool :
@@ -260,7 +295,7 @@ async def all_resources(
260295) -> list [Resource ]:
261296 """Discover all available resources across all configured stores.
262297
263- State is always dict-based internally ( keyed by store_id) .
298+ State is always dict-based internally, keyed by store_id.
264299 Collection keys depend on use_multi_store_keys:
265300 - True: ["/_meta/store", "/id"] (new captures and multi-store)
266301 - False: ["/id"] (legacy single-store captures for backward compatibility)
@@ -327,6 +362,11 @@ def open(
327362 task : Task ,
328363 _all_bindings = None ,
329364 ):
365+ # Reconcile state: migrate legacy flat state or add new stores
366+ _reconcile_connector_state (
367+ stores_with_access , binding , state , initial_state , task
368+ )
369+
330370 # Warn if FulfillmentOrders has partial scope coverage
331371 if model == gql .FulfillmentOrders :
332372 fo_scopes = gql .FulfillmentOrders .QUALIFYING_SCOPES
@@ -342,11 +382,7 @@ def open(
342382 if not model .SHOULD_USE_BULK_QUERIES and "edges" in model .QUERY .lower ():
343383 raise RuntimeError ("Non-bulk queries cannot contain nested connections." )
344384
345- # Migrate legacy state and/or add new stores
346- for store_id in stores_with_access :
347- _migrate_legacy_state (state , store_id , initial_state , task , binding )
348-
349- # Build fetch functions (always dict-based)
385+ # Build fetch functions (always dict-based, keyed by store_id)
350386 data_model = create_response_data_model (model )
351387 fetch_changes : dict [str , functools .partial ] = {}
352388 fetch_page : dict [str , functools .partial ] = {}
0 commit comments