Skip to content

Osdf optimization#332

Open
atripathy86 wants to merge 3 commits intomasterfrom
osdf_optimization
Open

Osdf optimization#332
atripathy86 wants to merge 3 commits intomasterfrom
osdf_optimization

Conversation

@atripathy86
Copy link
Collaborator

Related Issues / Pull Requests

Improve Handling of OSDF pulls. Debug why pulls from caches fail occasionally.

Description

  • Switches OSDF downloads from in-memory (response.content) to streaming (iter_content) with inline MD5 computation, avoiding large memory allocations for big files.
  • Add --debug flag to cmf artifact pull for OSDF download diagnostics CLI flag prints per-file debug info during OSDF artifact pulls, the cache URL actually attempted, file size, download time, and rate (KB/s or MB/s).
  • Resolves the actual cache server URL upfront via a HEAD request (following redirects, mirroring curl -L --head -w "%{url_effective}"), then retries the resolved URL up to 3 times before falling back to origin.
  • Origin downloads also retry up to 3 times. Logs per-attempt failures with attempt number.
  • In --debug mode, prints the resolved cache URL and per-attempt failure details.
  • Also surfaces the HTTP-resolved URL on successful downloads when it differs from the requested URL.

What changes are proposed in this pull request?

  • Bug fix (non-breaking change which fixes an issue).
  • New feature (non-breaking change which adds functionality).
  • Breaking change (fix or feature that would cause existing functionality to not work as expected; for instance,
    examples in this repository need to be updated too).
  • This change requires a documentation update.

Checklist:

  • My code follows the style guidelines of this project (PEP-8 with Google-style docstrings).
  • My code modifies existing public API, or introduces new public API, and I updated or wrote docstrings that
    uses Google-style formatting and any other formatting that is supported by mkdocs and plugins this project
    uses.
  • I have commented my code.
  • My code requires documentation updates, and I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works.
  • [] New and existing unit tests pass locally with my changes.

- Switches OSDF downloads from in-memory (response.content)
  to streaming (iter_content) with inline MD5 computation, avoiding large memory
  allocations for big files.
- Add --debug flag to cmf artifact pull for OSDF download diagnostics
  CLI flag prints per-file debug info during OSDF artifact
  pulls, the cache URL actually attempted, file size, download time, and rate
  (KB/s or MB/s).
…oads

- Resolves the actual cache server URL upfront via a HEAD request (following
redirects, mirroring `curl -L --head -w "%{url_effective}"`), then retries
the resolved URL up to 3 times before falling back to origin.
- Origin downloads also retry up to 3 times. Logs per-attempt failures with attempt number.
- In --debug mode, prints the resolved cache URL and per-attempt failure details.
- Also surfaces the HTTP-resolved URL on successful downloads when it differs from the requested URL.
- Moved 'Resolved cache to' print to before the download loop (using HEAD result)
  so it appears immediately for large files instead of after the download
  completes.
- Print resolved URL exactly once per file, always stripping auth
  params (?authz=). Print failure reason in logs unconditionally.
- Add 15s sleep with log message between retries.
- Refactor download_and_verify_file to return a 3-tuple (success, message, resolved_url)
  so callers have access to the actual GET-resolved URL.
- Add separator line and
  newline after each downloaded file in debug mode. (Martin's request)
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves OSDF artifact pull reliability and observability by switching downloads to a streaming approach, adding retry logic for cache/origin, and exposing a --debug CLI flag for per-file diagnostics.

Changes:

  • Stream OSDF downloads with inline MD5 computation (reduces memory usage).
  • Add cache/origin retry loops and resolve cache redirect targets via HEAD.
  • Add cmf artifact pull --debug to print per-file diagnostics during OSDF pulls.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
cmflib/storage_backends/osdf_artifacts.py Implements streaming download + MD5, retry logic, cache URL resolution, and debug printing.
cmflib/commands/artifact/pull.py Wires a new --debug flag into OSDF artifact pulls and prints separators in debug mode.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +196 to +197
head_resp = requests.head(cached_s_url, headers=self.headers, timeout=5, verify=True, allow_redirects=True)
resolved_cache_url = head_resp.url
Comment on lines 170 to +235
if cache == "":
#Fetch from Origin
success, result = download_and_verify_file(host, self.headers, download_path, local_path, artifact_hash, timeout=10)
if success:
#logger.info(result)
return success, result
#logger.error(f"Failed to download and verify file: {result}")
#Fetch from Origin, retry up to 3 times
success, result = False, "Not attempted"
for attempt in range(1, 4):
success, result, resolved_url = download_and_verify_file(host, self.headers, download_path, local_path, artifact_hash, timeout=10, debug=self.debug)
if success:
#logger.info(result)
return success, result
logger.error(f"Failed to download and verify file from origin (attempt {attempt}/3): {result}")
if self.debug:
print(f" [DEBUG] Origin attempt {attempt}/3 failed: {result}")
if attempt < 3:
logger.info(f"Sleeping 15s before retry {attempt+1}/3...")
if self.debug:
print(f" Sleeping 15s before retry {attempt+1}/3...")
time.sleep(15)
return success, result
else:
#Generate Cached path for artifact
cached_s_url = generate_cached_url(host, cache)
#Try to fetch from cache first
success, cached_result = download_and_verify_file(cached_s_url, self.headers, download_path, local_path, artifact_hash, timeout=5)
if success:
#logger.info(cached_result)
return success, cached_result
else:
logger.error(f"Failed to download and verify file from cache: {cached_result}")
logger.info(f"Trying Origin at {host}")
#Fetch from Origin
success, origin_result = download_and_verify_file(host, self.headers, download_path, local_path, artifact_hash, timeout=10)
if success:
if self.debug:
print(f"Fetching Artifact {os.path.basename(local_path)}")
#print(f" [DEBUG] Trying cache at {cached_s_url}")
# Follow redirects via HEAD to find the actual cache server (mirrors: curl -L --head -w "%{url_effective}")
resolved_cache_url = cached_s_url
try:
head_resp = requests.head(cached_s_url, headers=self.headers, timeout=5, verify=True, allow_redirects=True)
resolved_cache_url = head_resp.url
except Exception:
pass # If HEAD fails, fall back to original URL for the GET
if self.debug:
print(f" [DEBUG] Resolved cache to {resolved_cache_url.split('?')[0]}")
#Try to fetch from cache first (using resolved URL to skip redirect in the download)
#Retry up to 3 times on the resolved URL before falling back to origin
success, cached_result = False, "Not attempted"
for attempt in range(1, 4):
success, cached_result, resolved_url = download_and_verify_file(resolved_cache_url, self.headers, download_path, local_path, artifact_hash, timeout=5, debug=self.debug)
if success:
#logger.info(cached_result)
return success, cached_result
logger.error(f"Failed to download and verify file from cache (attempt {attempt}/3): {cached_result}")
if self.debug:
print(f" [DEBUG] Cache attempt {attempt}/3 failed: {cached_result}")
if attempt < 3:
logger.info(f"Sleeping 15s before retry {attempt+1}/3...")
if self.debug:
print(f" Sleeping 15s before retry {attempt+1}/3...")
time.sleep(15)
logger.error(f"All 3 cache attempts failed, falling back to origin.")
logger.info(f"Trying Origin at {host}")
#Fetch from Origin, retry up to 3 times
success, origin_result = False, "Not attempted"
for attempt in range(1, 4):
success, origin_result, resolved_url = download_and_verify_file(host, self.headers, download_path, local_path, artifact_hash, timeout=10, debug=self.debug)
if success:
#logger.info(origin_result)
return success, origin_result
#logger.error(f"Failed to download and verify file: {origin_result}")
return success, origin_result
logger.error(f"Failed to download and verify file from origin (attempt {attempt}/3): {origin_result}")
if self.debug:
print(f" [DEBUG] Origin attempt {attempt}/3 failed: {origin_result}")
if attempt < 3:
logger.info(f"Sleeping 15s before retry {attempt+1}/3...")
if self.debug:
print(f" Sleeping 15s before retry {attempt+1}/3...")
time.sleep(15)
return success, origin_result
Comment on lines 55 to 75
@@ -65,61 +67,67 @@ def download_and_verify_file(host, headers, remote_file_path, local_path, artifa
local_path (str): Logical or original path/name used for logging and status messages.
artifact_hash (str): Expected MD5 hash of the artifact, used for integrity verification.
timeout (float | int): Timeout (in seconds) for the HTTP request.
debug (bool): When True, print per-file size, timing, and rate to console.

Returns:
tuple[bool, str]: A pair ``(success, message)`` where ``success`` indicates whether the
download and MD5 verification succeeded, and ``message`` describes the outcome.
"""
Comment on lines +79 to +80
response = requests.get(host, headers=headers, timeout=timeout, verify=True, stream=True) # This should be made True. otherwise this will produce Insecure SSL Warning
resolved_url = response.url.split('?')[0] # final URL after any redirects, strip auth params
Comment on lines +90 to +117
try:
start_dl = time.time()
total_bytes = 0
md5 = hashlib.md5()
with open(remote_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
md5.update(chunk)
total_bytes += len(chunk)
elapsed = time.time() - start_dl
except Exception as e:
return False, f"An error occurred while writing to the file: {e}", resolved_url

if total_bytes == 0:
return False, "No data received from the server.", resolved_url

md5_hash = md5.hexdigest()
#logger.debug(f"MD5 hash of the downloaded file is: {md5_hash}")
#logger.debug(f"Artifact hash from MLMD records is: {artifact_hash}")
if artifact_hash == md5_hash:
#logger.debug("MD5 hash of the downloaded file matches the hash in MLMD records.")
stmt = f"object {local_path} downloaded at {remote_file_path} and matches MLMD records."
success = True
else:
#logger.error("Error: MD5 hash of the downloaded file does not match the hash in MLMD records.")
stmt = f"object {local_path} downloaded at {remote_file_path} does NOT match MLMD records."
success = False
Comment on lines +42 to 54
# NOTE: calculate_md5_from_file is superseded by in-chunk MD5 calculation during streaming
# in download_and_verify_file(). Kept here for reference.
# def calculate_md5_from_file(file_path, chunk_size=8192):
# md5 = hashlib.md5()
# try:
# with open(file_path, 'rb') as f:
# while chunk := f.read(chunk_size):
# md5.update(chunk)
# except Exception as e:
# logger.error(f"[calculate_md5_from_file] An error occurred while reading the file: {e}")
# return None
# return md5.hexdigest()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants