Skip to content

Add download/upload progress callback support #488

Open
@MHendricks

Description

@MHendricks

I've started using cloudpathlib recently and its made what I'm working on so much easier. I would like to report download progress to the users if any of my cloud files need downloaded.

I'm using S3 and got it working for my needs in my personal PR blurstudio-forks#1. It's fairly easy to configure by passing a callable to the new transfer_callable argument of S3Client. The callable must accept the arguments direction, state, cloud_path, bytes_sent). This uses the S3Client to enable callbacks for any CloudPath used if it happens to trigger download/uploads.

The first 3 arguments allow your callable to understand what is happening and update your UI as needed including keeping track of multiple transfers happening at the same time. It lets you know when a transfer starts and stops so you can show/hide UI.

A simple implementation example:

from itertools import islice
from cloudpathlib import CloudPath, S3Client

def progress_callback(direction, state, cloud_path, bytes_sent):
    print("Progress", direction, state, cloud_path, bytes_sent)

client = S3Client(transfer_callable=progress_callback, no_sign_request=True)
ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client)
a_file = next(islice(ladi.iterdir(), 1))
# If file needs downloaded, this will call `progress_callback` to report start, stop and download progress
print(a_file.fspath)

I have only worked with S3, but I hope the other services also provide a transfer progress reporting system. Is this something you would like to implement or should I develop my code enough to create a pull request?

A more complex example using the rich library to show download progress of several files downloaded at once.
import concurrent.futures
from functools import partial
from itertools import islice
from cloudpathlib import CloudPath, S3Client
from rich.progress import BarColumn, DownloadColumn, Progress, TransferSpeedColumn, TextColumn

callback_state: dict[CloudPath, dict] = {}


def progress_callback(progress, direction, state, cloud_path, bytes_sent):
    if state == "start":
        progress.start()
        size = cloud_path.stat().st_size
        task_id = progress.add_task("Downloading", total=size, filename=cloud_path.name)
        callback_state[cloud_path] = dict(task_id=task_id, size=size)
    elif state == "stop":
        if cloud_path in callback_state:
            del callback_state[cloud_path]
    else:
        info = callback_state[cloud_path]
        progress.update(info["task_id"], advance=bytes_sent)


def download_url(cloud_path):
    # Trigger download of non-cached files
    cloud_path.fspath


def download(urls) -> None:
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {}
        for url in urls:
            future_to_url[executor.submit(download_url, url)] = url

        for future in concurrent.futures.as_completed(future_to_url):
            try:
                future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (future_to_url[future], exc))
        progress.stop()


if __name__ == "__main__":
    # Set up the progress bar and attach the callback to the S3Client
    progress = Progress(
        TextColumn("[blue]{task.fields[filename]}"),
        BarColumn(),
        DownloadColumn(),
        TransferSpeedColumn(),
    )
    client = S3Client(
        transfer_callable=partial(progress_callback, progress),
        no_sign_request=True,
    )

    # Download the first first 5 files from this resource.
    ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client)
    download(islice(ladi.iterdir(), 5))

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions