Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions tap_bing_ads/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
def should_retry_httperror(exception):
""" Return true if exception is required to retry otherwise return false """
try:
if isinstance(exception, ConnectionError) or isinstance(exception, ssl.SSLError) or isinstance(exception, suds.transport.TransportError) or isinstance(exception, socket.timeout) or type(exception) == URLError: # pylint: disable=consider-merging-isinstance,no-else-return)
if isinstance(exception, ConnectionError) or isinstance(exception, ssl.SSLError) or isinstance(exception, suds.transport.TransportError) or isinstance(exception, socket.timeout) or type(exception) == URLError or isinstance(exception, InternalServerError): # pylint: disable=consider-merging-isinstance,no-else-return)
return True
elif (type(exception) == Exception and exception.args[0][0] == 408) or exception.code == 408:
# A 408 Request Timeout is an HTTP response status code that indicates the server didn't receive a complete
Expand Down Expand Up @@ -96,6 +96,9 @@ def get_user_agent():
class InvalidDateRangeEnd(Exception):
pass

class InternalServerError(Exception):
pass

def log_service_call(service_method, account_id):
def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements
log_args = list(map(lambda arg: str(arg).replace('\n', '\\n'), args)) + list(map(lambda kv: '{}={}'.format(*kv), kwargs.items()))
Expand All @@ -116,9 +119,9 @@ def wrapper(*args, **kwargs): # pylint: disable=inconsistent-return-statements
if any(invalid_date_range_end_errors):
raise InvalidDateRangeEnd(invalid_date_range_end_errors) from e
LOGGER.info('Caught exception for account: %s', account_id)
raise Exception(operation_errors) from e
raise InternalServerError(operation_errors) from e
if hasattr(e.fault.detail, 'AdApiFaultDetail'):
raise Exception(e.fault.detail.AdApiFaultDetail.Errors) from e
raise InternalServerError(e.fault.detail.AdApiFaultDetail.Errors) from e

return wrapper

Expand Down Expand Up @@ -1062,11 +1065,17 @@ async def do_sync_all_accounts(account_ids, catalog):
LOGGER.info('Syncing Accounts')
sync_accounts_stream(account_ids, selected_streams['accounts'])

sync_account_data_tasks = [
sync_account_data(account_id, catalog, selected_streams)
for account_id in account_ids
]
await asyncio.gather(*sync_account_data_tasks)
for stream, catalog_entry in selected_streams.items():
LOGGER.info('Syncing %s', stream)

sync_account_data_tasks = [
sync_account_data(account_id,
catalog,
{stream: catalog_entry})
for account_id in account_ids
]
await asyncio.gather(*sync_account_data_tasks)


async def main_impl():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
Expand Down