Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions parsons/catalist/catalist.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,45 @@ def await_completion(
result = self.load_matches(id=id)
return result

def load_matches(self, id: str) -> Table:
def fetch_zip_with_retry(
self, remote_path: str, job_id: str, max_retries: int = 3, backoff_factor: int = 2
):
last_exception = None

for attempt in range(1, max_retries + 1):
try:
# 1. Attempt the download
temp_file_zip = self.sftp.get_file(
remote_path=remote_path, export_chunk_size=DEFAULT_EXPORT_CHUNK_SIZE
)
Comment on lines +350 to +352
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will solve your test failure

Suggested change
temp_file_zip = self.sftp.get_file(
remote_path=remote_path, export_chunk_size=DEFAULT_EXPORT_CHUNK_SIZE
)
temp_file_zip = Path(
self.sftp.get_file(
remote_path=remote_path,
export_chunk_size=DEFAULT_EXPORT_CHUNK_SIZE
)
)


# 2. Validate the integrity of the file
if not is_zipfile(temp_file_zip):
size = temp_file_zip.stat().st_size
raise RuntimeError(f"Corrupt zip (Size: {size} bytes)")
Comment on lines +342 to +357
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we don't just decorate this function with @backfoff, the retry logic feels kind of manual


# 3. Success: Return the file
return temp_file_zip

except RuntimeError as e:
last_exception = e
if attempt < max_retries:
sleep_time = backoff_factor**attempt
logger.warning(
f"Attempt {attempt} failed for job {job_id}: {e}. "
f"Retrying in {sleep_time}s..."
)
time.sleep(sleep_time)
else:
logger.error(f"Final attempt failed for job {job_id}.")

# If the loop finishes without returning, raise the final error
raise RuntimeError(
f"Failed to download valid zip for job {job_id} after {max_retries} attempts. "
f"Remote path: {remote_path}. Error: {last_exception}"
)

def load_matches(self, id: str, max_retries: int = 3, backoff_factor: int = 2) -> Table:
"""Take a completed job ID, download and open the match file as a Table.

Result will be a Table with all the original columns along with columns 'DWID',
Expand Down Expand Up @@ -372,17 +410,13 @@ def load_matches(self, id: str) -> Table:
remote_filename = [filename for filename in remote_filepaths if id in filename][0]
remote_filepath = "/myDownloads/" + remote_filename

temp_file_zip = self.sftp.get_file(
remote_path=remote_filepath, export_chunk_size=DEFAULT_EXPORT_CHUNK_SIZE
temp_file_zip = self.fetch_zip_with_retry(
job_id=id,
remote_path=remote_filepath,
max_retries=max_retries,
backoff_factor=backoff_factor,
)

if not is_zipfile(temp_file_zip):
raise RuntimeError(
f"Downloaded file for job {id} is not a valid zip file "
f"(size: {temp_file_zip.stat().st_size} bytes, remote path: {remote_filepath}). "
"The SFTP download may be corrupt or incomplete."
)

logger.debug(
f"Download complete for job {id} (local size: {temp_file_zip.stat().st_size} bytes)."
)
Expand Down
Loading