Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ def main_impl():
is_sandbox=CONFIG.get('is_sandbox'),
select_fields_by_default=CONFIG.get('select_fields_by_default'),
default_start_date=CONFIG.get('start_date'),
api_type=CONFIG.get('api_type'))
api_type=CONFIG.get('api_type'),
pk_chunking=CONFIG.get('pk_chunking', {}))
sf.login()

if args.discover:
Expand Down
5 changes: 3 additions & 2 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def __init__(self,
is_sandbox=None,
select_fields_by_default=None,
default_start_date=None,
api_type=None):
api_type=None,
pk_chunking={}):
self.api_type = api_type.upper() if api_type else None
self.session = requests.Session()
if isinstance(quota_percent_per_run, str) and quota_percent_per_run.strip() == '':
Expand All @@ -222,7 +223,7 @@ def __init__(self,
self.rest_requests_attempted = 0
self.jobs_completed = 0
self.data_url = "{}/services/data/v53.0/{}"
self.pk_chunking = False
self.pk_chunking = pk_chunking

self.auth = SalesforceAuth.from_credentials(credentials, is_sandbox=self.is_sandbox)

Expand Down
80 changes: 52 additions & 28 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ def __init__(self, sf):

def query(self, catalog_entry, state):
self.check_bulk_quota_usage()
pk_chunking = self.sf.pk_chunking.get('enabled')
chunked_tables = self.sf.pk_chunking.get('tables', [])

for record in self._bulk_query(catalog_entry, state):
yield record
if pk_chunking and catalog_entry['stream'] in chunked_tables:
for record in self._bulk_query_with_pk_chunking(catalog_entry, state):
yield record
else:
for record in self._bulk_query(catalog_entry, state):
yield record

self.sf.jobs_completed += 1

Expand Down Expand Up @@ -99,34 +105,37 @@ def _bulk_query(self, catalog_entry, state):
batch_status = self._poll_on_batch_status(job_id, batch_id)

if batch_status['state'] == 'Failed':
if "QUERY_TIMEOUT" in batch_status['stateMessage']:
batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date)
job_id = batch_status['job_id']

message = batch_status['stateMessage']
if "QUERY_TIMEOUT" in message:
LOGGER.info("Retrying Bulk Query for %s with PK Chunking: %s", catalog_entry['stream'], message)
# Set pk_chunking to True to indicate that we should write a bookmark differently
self.sf.pk_chunking = True

# Add the bulk Job ID and its batches to the state so it can be resumed if necessary
tap_stream_id = catalog_entry['tap_stream_id']
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

for completed_batch_id in batch_status['completed']:
for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry):
yield result
# Remove the completed batch ID and write state
state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id)
LOGGER.info("Finished syncing batch %s. Removing batch from state.", completed_batch_id)
LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"]))
singer.write_state(state)
self.sf.pk_chunking['enabled'] = True
self._bulk_query_with_pk_chunking(catalog_entry, state)
else:
raise TapSalesforceException(batch_status['stateMessage'])
raise TapSalesforceException(message)
else:
for result in self.get_batch_results(job_id, batch_id, catalog_entry):
yield result

def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):
LOGGER.info("Retrying Bulk Query with PK Chunking")
def _bulk_query_with_pk_chunking(self, catalog_entry, state):
start_date = self.sf.get_start_date(state, catalog_entry)
batch_status = self._perform_pk_chunking_query(catalog_entry, start_date)
job_id = batch_status['job_id']

# Add the bulk Job ID and its batches to the state so it can be resumed if necessary
tap_stream_id = catalog_entry['tap_stream_id']
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

for completed_batch_id in batch_status['completed']:
for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry):
yield result
# Remove the completed batch ID and write state
self._complete_batch(state, tap_stream_id, completed_batch_id)
singer.write_state(state)

def _perform_pk_chunking_query(self, catalog_entry, start_date):
LOGGER.info("Making Bulk Query for %s with PK Chunking", catalog_entry['stream'])

# Create a new job
job_id = self._create_job(catalog_entry, True)
Expand All @@ -137,6 +146,8 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):
batch_status['job_id'] = job_id

if batch_status['failed']:
for batch in [b for b in batch_status['all'] if b['state'] == "Failed"]:
LOGGER.info(batch['stateMessage'])
raise TapSalesforceException("One or more batches failed during PK chunked job")

# Close the job after all the batches are complete
Expand All @@ -152,9 +163,10 @@ def _create_job(self, catalog_entry, pk_chunking=False):
headers['Sforce-Disable-Batch-Retry'] = "true"

if pk_chunking:
LOGGER.info("ADDING PK CHUNKING HEADER")
chunk_size = self.sf.pk_chunking.get('chunk_size', DEFAULT_CHUNK_SIZE)
LOGGER.info("Adding PK chunking header w/chunk size: %d", chunk_size)

headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(DEFAULT_CHUNK_SIZE)
headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(chunk_size)

# If the stream ends with 'CleanInfo' or 'History', we can PK Chunk on the object's parent
if any(catalog_entry['stream'].endswith(suffix) for suffix in ["CleanInfo", "History"]):
Expand Down Expand Up @@ -190,6 +202,17 @@ def _add_batch(self, catalog_entry, job_id, start_date, order_by_clause=True):

return batch['batchInfo']['id']

def _complete_batch(self, state, tap_stream_id, batch_id):
Copy link
Copy Markdown
Author

@sunild sunild Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this method b/c when a pk chunked job was complete, the bookmark still had the JobID and BatchIDs properties in it. The JobID needs to be cleared from the state, otherwise on the next syncing interval, it would incorrectly think it was resuming a failed batch (but the BatchIDs array was empty).

See:

if job_id:
with metrics.record_counter(stream) as counter:
LOGGER.info("Found JobID from previous Bulk Query. Resuming sync for job: %s", job_id)

stream = state['bookmarks'][tap_stream_id]
batch_ids = stream['BatchIDs']
batch_ids.remove(batch_id)
LOGGER.info("%s: finished syncing batch %s. Removing batch from state.", tap_stream_id, batch_id)
LOGGER.info("%s: batches to go %d", tap_stream_id, len(batch_ids))
if len(batch_ids) <= 0:
stream.pop('BatchIDs', None)
stream.pop('JobID', None)
stream.pop('JobHighestBookmarkSeen', None)

def _poll_on_pk_chunked_batch_status(self, job_id):
batches = self._get_batches(job_id)

Expand All @@ -200,9 +223,10 @@ def _poll_on_pk_chunked_batch_status(self, job_id):
if not queued_batches and not in_progress_batches:
completed_batches = [b['id'] for b in batches if b['state'] == "Completed"]
failed_batches = [b['id'] for b in batches if b['state'] == "Failed"]
return {'completed': completed_batches, 'failed': failed_batches}
return {'completed': completed_batches, 'failed': failed_batches, 'all': batches}
else:
time.sleep(PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP)
sleep_time = self.sf.pk_chunking.get('polling_sleep_time', PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP)
time.sleep(sleep_time)
batches = self._get_batches(job_id)

def _poll_on_batch_status(self, job_id, batch_id):
Expand Down
4 changes: 2 additions & 2 deletions tap_salesforce/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def sync_records(sf, catalog_entry, state, counter, state_msg_threshold):

replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key])

if sf.pk_chunking:
if sf.pk_chunking.get('enabled'):
if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark:
# Replace the highest seen bookmark and save the state in case we need to resume later
chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key])
Expand Down Expand Up @@ -164,7 +164,7 @@ def sync_records(sf, catalog_entry, state, counter, state_msg_threshold):
state, catalog_entry['tap_stream_id'], 'version', None)

# If pk_chunking is set, only write a bookmark at the end
if sf.pk_chunking:
if sf.pk_chunking.get('enabled'):
# Write a bookmark with the highest value we've seen
state = singer.write_bookmark(
state,
Expand Down