@@ -366,6 +366,7 @@ def create_open_fn(
366366 model : type [ShopifyGraphQLResource ],
367367 stores_with_access : list [str ],
368368 initial_state : ResourceState ,
369+ use_multi_store_keys : bool ,
369370 ):
370371 def open (
371372 binding : CaptureBinding [ResourceConfig ],
@@ -374,7 +375,7 @@ def open(
374375 task : Task ,
375376 _all_bindings = None ,
376377 ):
377- # Reconcile state: migrate legacy flat state or add new stores
378+ # Reconcile state (only for dict-based state with multiple stores)
378379 _reconcile_connector_state (
379380 stores_with_access , binding , state , initial_state , task
380381 )
@@ -394,42 +395,80 @@ def open(
394395 if not model .SHOULD_USE_BULK_QUERIES and "edges" in model .QUERY .lower ():
395396 raise RuntimeError ("Non-bulk queries cannot contain nested connections." )
396397
397- # Build fetch functions (always dict-based, keyed by store_id)
398398 data_model = create_response_data_model (model )
399- fetch_changes : dict [str , functools .partial ] = {}
400- fetch_page : dict [str , functools .partial ] = {}
401399
402- for store_id in stores_with_access :
400+ if use_multi_store_keys :
401+ # Dict-based fetch functions for multiple stores
402+ fetch_changes : dict [str , functools .partial ] = {}
403+ fetch_page : dict [str , functools .partial ] = {}
404+
405+ for store_id in stores_with_access :
406+ ctx = store_contexts [store_id ]
407+
408+ if model .SHOULD_USE_BULK_QUERIES :
409+ fetch_changes [store_id ] = functools .partial (
410+ bulk_fetch_incremental ,
411+ ctx ["http" ], config .advanced .window_size , ctx ["bulk_job_manager" ], model , store_id ,
412+ )
413+ elif model .SORT_KEY is None :
414+ fetch_changes [store_id ] = functools .partial (
415+ fetch_incremental_unsorted ,
416+ ctx ["client" ], model , data_model , store_id ,
417+ )
418+ else :
419+ fetch_changes [store_id ] = functools .partial (
420+ fetch_incremental ,
421+ ctx ["client" ], model , data_model , store_id ,
422+ )
423+ fetch_page [store_id ] = functools .partial (
424+ backfill_incremental ,
425+ ctx ["client" ], model , data_model , store_id ,
426+ )
427+
428+ open_binding (
429+ binding ,
430+ binding_index ,
431+ state ,
432+ task ,
433+ fetch_changes = fetch_changes , # type: ignore[arg-type]
434+ fetch_page = fetch_page if fetch_page else None , # type: ignore[arg-type]
435+ )
436+ else :
437+ # Flat fetch functions for single store (legacy backward compatibility)
438+ # For flat state, open_binding expects functools.partial, not dict
439+ store_id = stores_with_access [0 ]
403440 ctx = store_contexts [store_id ]
404441
405442 if model .SHOULD_USE_BULK_QUERIES :
406- fetch_changes [ store_id ] = functools .partial (
443+ fetch_changes_fn = functools .partial (
407444 bulk_fetch_incremental ,
408445 ctx ["http" ], config .advanced .window_size , ctx ["bulk_job_manager" ], model , store_id ,
409446 )
447+ fetch_page_fn = None
410448 elif model .SORT_KEY is None :
411- fetch_changes [ store_id ] = functools .partial (
449+ fetch_changes_fn = functools .partial (
412450 fetch_incremental_unsorted ,
413451 ctx ["client" ], model , data_model , store_id ,
414452 )
453+ fetch_page_fn = None
415454 else :
416- fetch_changes [ store_id ] = functools .partial (
455+ fetch_changes_fn = functools .partial (
417456 fetch_incremental ,
418457 ctx ["client" ], model , data_model , store_id ,
419458 )
420- fetch_page [ store_id ] = functools .partial (
459+ fetch_page_fn = functools .partial (
421460 backfill_incremental ,
422461 ctx ["client" ], model , data_model , store_id ,
423462 )
424463
425- open_binding (
426- binding ,
427- binding_index ,
428- state ,
429- task ,
430- fetch_changes = fetch_changes , # type: ignore[arg-type]
431- fetch_page = fetch_page if fetch_page else None , # type: ignore[arg-type]
432- )
464+ open_binding (
465+ binding ,
466+ binding_index ,
467+ state ,
468+ task ,
469+ fetch_changes = fetch_changes_fn ,
470+ fetch_page = fetch_page_fn ,
471+ )
433472
434473 return open
435474
@@ -438,7 +477,7 @@ def open(
438477 name = model .NAME ,
439478 key = key ,
440479 model = ShopifyGraphQLResource ,
441- open = create_open_fn (model , stores_with_access , initial_state ),
480+ open = create_open_fn (model , stores_with_access , initial_state , use_multi_store_keys ),
442481 initial_state = initial_state ,
443482 initial_config = ResourceConfig (name = model .NAME , interval = timedelta (minutes = 5 )),
444483 schema_inference = True ,
0 commit comments