22import itertools
33from datetime import datetime
44from logging import Logger
5- from typing import Any , AsyncGenerator , TypeVar
5+ from typing import Any , AsyncGenerator , Awaitable , TypeVar
66
77from braintree import BraintreeGateway
88
9- from estuary_cdk .http import HTTPSession
9+ from estuary_cdk .http import HTTPError , HTTPSession
1010
1111from .common import (
1212 HEADERS ,
2727)
2828
2929
30+ def _should_retry (
31+ status : int ,
32+ headers : dict [str , Any ],
33+ body : bytes ,
34+ attempt : int ,
35+ ) -> bool :
36+ # If the response is a 500 status code, that could mean that too much data was requested
37+ # and the connector should make a new request for less data.
38+ return status != 500
39+
40+
3041async def fetch_searchable_resource_ids_by_field_between (
3142 http : HTTPSession ,
3243 base_url : str ,
@@ -93,7 +104,17 @@ async def determine_next_searchable_resource_window_end_by_field(
93104 return end
94105
95106
96- async def _fetch_chunk (
107+ async def _process_completed_fetches (
108+ fetch_coroutines : list [Awaitable [list [dict [str , Any ]]]],
109+ ) -> AsyncGenerator [dict [str , Any ], None ]:
110+ """Helper to process fetching multiple pages of resources and yield individual resources."""
111+ for coro in asyncio .as_completed (fetch_coroutines ):
112+ result = await coro
113+ for resource in result :
114+ yield resource
115+
116+
117+ async def _fetch_resource_page (
97118 http : HTTPSession ,
98119 base_url : str ,
99120 path : str ,
@@ -102,8 +123,6 @@ async def _fetch_chunk(
102123 semaphore : asyncio .Semaphore ,
103124 log : Logger ,
104125) -> list [dict [str , Any ]]:
105- assert len (ids ) <= SEARCH_PAGE_SIZE
106-
107126 url = f"{ base_url } /{ path } /advanced_search"
108127 body = {
109128 "search" : {
@@ -114,7 +133,7 @@ async def _fetch_chunk(
114133 async with semaphore :
115134 response = response_model .model_validate (
116135 braintree_xml_to_dict (
117- await http .request (log , url , "POST" , json = body , headers = HEADERS )
136+ await http .request (log , url , "POST" , json = body , headers = HEADERS , should_retry = _should_retry )
118137 )
119138 )
120139
@@ -128,6 +147,46 @@ async def _fetch_chunk(
128147 return resources
129148
130149
150+ async def _fetch_resource_batch (
151+ http : HTTPSession ,
152+ base_url : str ,
153+ path : str ,
154+ response_model : type [SearchResponse ],
155+ ids : list [str ],
156+ semaphore : asyncio .Semaphore ,
157+ log : Logger ,
158+ ) -> list [dict [str , Any ]]:
159+ assert len (ids ) <= SEARCH_PAGE_SIZE
160+
161+ try :
162+ # We try to fetch all resources in a single page.
163+ return await _fetch_resource_page (
164+ http ,
165+ base_url ,
166+ path ,
167+ response_model ,
168+ ids ,
169+ semaphore ,
170+ log ,
171+ )
172+ except HTTPError as err :
173+ # If Braintree's API server returns a 500 response, then it may be having problems
174+ # sending all resources in a single response. We try fetching resources individually
175+ # to make it easier for the API server to respond successfully.
176+ if err .code != 500 :
177+ raise err
178+
179+ log .info (f"Received status { err .code } response when fetching { len (ids )} resources. Attempting to fetch resources individually." )
180+
181+ resources : list [dict [str , Any ]] = []
182+
183+ async for resource in _process_completed_fetches (
184+ [_fetch_resource_page (http , base_url , path , response_model , [id ], semaphore , log ) for id in ids ]
185+ ):
186+ resources .append (resource )
187+
188+ return resources
189+
131190_IncrementalDocument = TypeVar ("_IncrementalDocument" , bound = IncrementalResource | Transaction )
132191
133192
@@ -144,19 +203,15 @@ async def fetch_by_ids(
144203) -> AsyncGenerator [_IncrementalDocument , None ]:
145204 semaphore = asyncio .Semaphore (SEMAPHORE_LIMIT )
146205
147- for coro in asyncio .as_completed (
148- [
149- _fetch_chunk (http , base_url , path , response_model , list (chunk ), semaphore , log )
150- for chunk in itertools .batched (ids , SEARCH_PAGE_SIZE )
151- ]
206+ async for resource in _process_completed_fetches (
207+ [_fetch_resource_batch (http , base_url , path , response_model , list (chunk ), semaphore , log )
208+ for chunk in itertools .batched (ids , SEARCH_PAGE_SIZE )]
152209 ):
153- result = await coro
154- for resource in result :
155- yield document_model .model_validate (
156- braintree_object_to_dict (
157- braintree_class (gateway , resource )
158- )
210+ yield document_model .model_validate (
211+ braintree_object_to_dict (
212+ braintree_class (gateway , resource )
159213 )
214+ )
160215
161216
162217async def fetch_searchable_resources_created_between (
0 commit comments