Skip to content

Commit 66935f4

Browse files
committed
Support more in-depth transfer metrics
1 parent 6a4e858 commit 66935f4

9 files changed

+1348
-52
lines changed

synapseclient/core/download/download_async.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class DownloadRequest(NamedTuple):
6464
async def download_file(
6565
client: "Synapse",
6666
download_request: DownloadRequest,
67+
progress_callback: Optional[callable] = None,
6768
) -> None:
6869
"""
6970
Main driver for the multi-threaded download. Users an ExecutorService,
@@ -74,8 +75,14 @@ async def download_file(
7475
client: A synapseclient
7576
download_request: A batch of DownloadRequest objects specifying what
7677
Synapse files to download
78+
progress_callback: Optional callback function to report download progress,
79+
called with (bytes_transferred, total_bytes)
7780
"""
78-
downloader = _MultithreadedDownloader(syn=client, download_request=download_request)
81+
downloader = _MultithreadedDownloader(
82+
syn=client,
83+
download_request=download_request,
84+
progress_callback=progress_callback
85+
)
7986
await downloader.download_file()
8087

8188

@@ -251,20 +258,25 @@ def __init__(
251258
self,
252259
syn: "Synapse",
253260
download_request: DownloadRequest,
261+
progress_callback: Optional[callable] = None,
254262
) -> None:
255263
"""
256264
Initializes the class
257265
258266
Arguments:
259267
syn: A synapseclient
260-
executor: An ExecutorService that will be used to run part downloads
261-
in separate threads
268+
download_request: A download request object specifying the file to download
269+
progress_callback: Optional callback function to report download progress,
270+
called with (bytes_transferred, total_bytes)
262271
"""
263272
self._syn = syn
264273
self._thread_lock = _threading.Lock()
265274
self._aborted = False
266275
self._download_request = download_request
267276
self._progress_bar = None
277+
self._progress_callback = progress_callback
278+
self._total_downloaded = 0
279+
self._total_size = 0
268280

269281
async def download_file(self) -> None:
270282
"""
@@ -282,6 +294,7 @@ async def download_file(self) -> None:
282294
retry_max_wait_before_failure=30,
283295
read_response_content=False,
284296
)
297+
self._total_size = file_size
285298
self._progress_bar = get_or_create_download_progress_bar(
286299
file_size=file_size,
287300
postfix=self._download_request.object_id,
@@ -454,7 +467,7 @@ def _write_chunk(
454467
self, request: DownloadRequest, chunk: bytes, start: int, length: int
455468
) -> None:
456469
"""Open the file and write the chunk to the specified byte range. Also update
457-
the progress bar.
470+
the progress bar and call the progress callback if provided.
458471
459472
Arguments:
460473
request: A DownloadRequest object specifying what Synapse file to download
@@ -470,6 +483,15 @@ def _write_chunk(
470483
file_write.seek(start)
471484
file_write.write(chunk)
472485
self._update_progress_bar(part_size=length)
486+
487+
# Update total downloaded and notify via progress_callback
488+
self._total_downloaded += length
489+
if self._progress_callback is not None:
490+
try:
491+
self._progress_callback(self._total_downloaded, self._total_size)
492+
except Exception:
493+
# Ignore errors in the progress callback to avoid breaking the download
494+
pass
473495

474496

475497
def _execute_stream_and_write_chunk(

synapseclient/core/download/download_functions.py

Lines changed: 126 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
SynapseMd5MismatchError,
4040
)
4141
from synapseclient.core.remote_file_storage_wrappers import S3ClientWrapper, SFTPWrapper
42+
from synapseclient.core.telemetry_integration import monitored_transfer
4243
from synapseclient.core.retry import (
4344
DEFAULT_RETRY_STATUS_CODES,
4445
RETRYABLE_CONNECTION_ERRORS,
@@ -311,6 +312,8 @@ def _get_aws_credentials() -> None:
311312
"""This is a stub function and only used for testing purposes."""
312313
return None
313314

315+
# TODO: Implement functionality in the calling functions to support metric collection for cache hits
316+
314317

315318
async def download_by_file_handle(
316319
file_handle_id: str,
@@ -418,8 +421,10 @@ async def download_by_file_handle(
418421

419422
syn = Synapse.get_client(synapse_client=synapse_client)
420423
os.makedirs(os.path.dirname(destination), exist_ok=True)
424+
attempts = 0
421425

422426
while retries > 0:
427+
attempts += 1
423428
try:
424429
file_handle_result: Dict[
425430
str, str
@@ -433,6 +438,10 @@ async def download_by_file_handle(
433438
concrete_type = file_handle["concreteType"]
434439
storage_location_id = file_handle.get("storageLocationId")
435440

441+
actual_file_size = file_handle.get("contentSize", 0)
442+
actual_mime_type = file_handle.get("contentType", None)
443+
actual_md5 = file_handle.get("contentMd5")
444+
436445
if concrete_type == concrete_types.EXTERNAL_OBJECT_STORE_FILE_HANDLE:
437446
profile = get_client_authenticated_s3_profile(
438447
endpoint=file_handle["endpointUrl"],
@@ -509,34 +518,99 @@ def download_fn(
509518
# and the file is large enough that it would be broken into parts to take advantage of
510519
# multiple downloading threads. otherwise it's more efficient to run the download as a simple
511520
# single threaded URL download.
512-
downloaded_path = await download_from_url_multi_threaded(
513-
file_handle_id=file_handle_id,
514-
object_id=synapse_id,
515-
object_type=entity_type,
516-
destination=destination,
517-
expected_md5=file_handle.get("contentMd5"),
518-
synapse_client=syn,
519-
)
521+
with monitored_transfer(
522+
operation="download",
523+
file_path=destination,
524+
file_size=actual_file_size,
525+
destination=f"synapse:{synapse_id}",
526+
syn_id=synapse_id,
527+
with_progress_bar=False, # progress bar is handled by the download function
528+
mime_type=actual_mime_type,
529+
md5=actual_md5,
530+
multipart=True,
531+
chunk_size=SYNAPSE_DEFAULT_DOWNLOAD_PART_SIZE,
532+
attempts=attempts,
533+
concrete_type=concrete_type
534+
) as monitor:
535+
try:
536+
downloaded_path = await download_from_url_multi_threaded(
537+
file_handle_id=file_handle_id,
538+
object_id=synapse_id,
539+
object_type=entity_type,
540+
destination=destination,
541+
expected_md5=actual_md5,
542+
synapse_client=syn,
543+
progress_callback=lambda bytes_transferred, total_bytes: monitor.update(
544+
bytes_transferred - getattr(monitor, '_last_transferred_bytes', 0)
545+
) if hasattr(monitor, '_last_transferred_bytes') else setattr(monitor, '_last_transferred_bytes', bytes_transferred)
546+
)
547+
# Update the monitor with the result
548+
monitor.transferred_bytes = actual_file_size
549+
except Exception as download_ex:
550+
monitor.record_retry(error=download_ex)
551+
raise
520552

521553
else:
554+
# Standard pre-signed URL download
522555
loop = asyncio.get_running_loop()
523556
progress_bar = get_or_create_download_progress_bar(
524-
file_size=1, postfix=synapse_id, synapse_client=syn
525-
)
526-
527-
downloaded_path = await loop.run_in_executor(
528-
syn._get_thread_pool_executor(asyncio_event_loop=loop),
529-
lambda: download_from_url(
530-
url=file_handle_result["preSignedURL"],
531-
destination=destination,
532-
entity_id=synapse_id,
533-
file_handle_associate_type=entity_type,
534-
file_handle_id=file_handle["id"],
535-
expected_md5=file_handle.get("contentMd5"),
536-
progress_bar=progress_bar,
537-
synapse_client=syn,
538-
),
557+
file_size=actual_file_size, postfix=synapse_id, synapse_client=syn
539558
)
559+
with monitored_transfer(
560+
operation="download",
561+
file_path=destination,
562+
file_size=actual_file_size,
563+
destination=f"synapse:{synapse_id}",
564+
syn_id=synapse_id,
565+
with_progress_bar=False, # progress bar is handled separately
566+
mime_type=actual_mime_type,
567+
md5=actual_md5,
568+
attempts=attempts,
569+
concrete_type=concrete_type,
570+
pre_signed_url=True
571+
) as monitor:
572+
# Create a wrapper around the standard download function that updates both
573+
# the progress bar and our telemetry monitor
574+
def download_with_telemetry(**kwargs):
575+
# Get the original progress callback function
576+
original_progress_func = kwargs.get('progress_callback', None)
577+
578+
def combined_progress_callback(bytes_transferred, total_bytes):
579+
# Calculate bytes since last update
580+
bytes_delta = bytes_transferred - getattr(combined_progress_callback, 'last_bytes', 0)
581+
setattr(combined_progress_callback, 'last_bytes', bytes_transferred)
582+
# Update telemetry monitor if bytes were transferred
583+
if bytes_delta > 0:
584+
monitor.update(bytes_delta)
585+
# Call the original callback if it exists
586+
if original_progress_func:
587+
original_progress_func(bytes_transferred, total_bytes)
588+
# Initialize the last bytes counter
589+
setattr(combined_progress_callback, 'last_bytes', 0)
590+
# Replace the progress callback
591+
kwargs['progress_callback'] = combined_progress_callback
592+
# Perform the download with original function
593+
try:
594+
result = download_from_url(**kwargs)
595+
return result
596+
except Exception as inner_ex:
597+
monitor.record_retry(error=inner_ex)
598+
raise
599+
600+
# Execute the download
601+
downloaded_path = await loop.run_in_executor(
602+
syn._get_thread_pool_executor(asyncio_event_loop=loop),
603+
lambda: download_with_telemetry(
604+
url=file_handle_result["preSignedURL"],
605+
destination=destination,
606+
entity_id=synapse_id,
607+
file_handle_associate_type=entity_type,
608+
file_handle_id=file_handle["id"],
609+
expected_md5=actual_md5,
610+
progress_bar=progress_bar,
611+
synapse_client=syn,
612+
),
613+
)
540614

541615
syn.logger.info(f"[{synapse_id}]: Downloaded to {downloaded_path}")
542616
syn.cache.add(
@@ -576,6 +650,7 @@ async def download_from_url_multi_threaded(
576650
*,
577651
expected_md5: str = None,
578652
synapse_client: Optional["Synapse"] = None,
653+
progress_callback: Optional[callable] = None,
579654
) -> str:
580655
"""
581656
Download a file from the given URL using multiple threads.
@@ -615,7 +690,20 @@ async def download_from_url_multi_threaded(
615690
debug=client.debug,
616691
)
617692

618-
await download_file(client=client, download_request=request)
693+
# Wrap the progress_callback to ensure consistent signature and error handling
694+
def proxy_progress_callback(bytes_transferred, total_bytes):
695+
if progress_callback is not None:
696+
try:
697+
progress_callback(bytes_transferred, total_bytes)
698+
except Exception:
699+
pass
700+
701+
# Pass progress_callback to download_file if supported
702+
await download_file(
703+
client=client,
704+
download_request=request,
705+
progress_callback=proxy_progress_callback if progress_callback else None
706+
)
619707

620708
if expected_md5: # if md5 not set (should be the case for all except http download)
621709
actual_md5 = utils.md5_for_file_hex(filename=temp_destination)
@@ -643,6 +731,7 @@ def download_from_url(
643731
file_handle_id: Optional[str] = None,
644732
expected_md5: Optional[str] = None,
645733
progress_bar: Optional[tqdm] = None,
734+
progress_callback: Optional[callable] = None,
646735
*,
647736
synapse_client: Optional["Synapse"] = None,
648737
) -> Union[str, None]:
@@ -808,14 +897,18 @@ def _ftp_report_hook(
808897
synapse_client=client,
809898
)
810899
refreshed_url = response["preSignedURL"]
811-
response = with_retry(
812-
lambda url=refreshed_url, range_header=range_header, auth=auth: client._requests_session.get(
900+
901+
def get_request(url=refreshed_url, range_header=range_header, auth=auth): return (
902+
client._requests_session.get(
813903
url=url,
814904
headers=client._generate_headers(range_header),
815905
stream=True,
816906
allow_redirects=False,
817907
auth=auth,
818-
),
908+
)
909+
)
910+
response = with_retry(
911+
get_request,
819912
verbose=client.debug,
820913
**STANDARD_RETRY_PARAMS,
821914
)
@@ -899,6 +992,12 @@ def _ftp_report_hook(
899992
increment_progress_bar(
900993
n=len(chunk), progress_bar=progress_bar
901994
)
995+
# Call the progress_callback if provided
996+
if progress_callback is not None:
997+
try:
998+
progress_callback(transferred, to_be_transferred)
999+
except Exception:
1000+
pass
9021001
except (
9031002
Exception
9041003
) as ex: # We will add a progress parameter then push it back to retry.

0 commit comments

Comments
 (0)