Skip to content

[DOM-65232] Fix tests failing in main and Implement range download for datasets over datasources #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
100 changes: 92 additions & 8 deletions domino_data/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Any, List, Optional

import hashlib
import os
from os.path import exists

Expand All @@ -15,7 +16,7 @@

from .auth import AuthenticatedClient, get_jwt_token
from .logging import logger
from .transfer import MAX_WORKERS, BlobTransfer
from .transfer import DEFAULT_CHUNK_SIZE, MAX_WORKERS, BlobTransfer, get_resume_state_path

ACCEPT_HEADERS = {"Accept": "application/json"}

Expand All @@ -24,6 +25,7 @@
DOMINO_USER_API_KEY = "DOMINO_USER_API_KEY"
DOMINO_USER_HOST = "DOMINO_USER_HOST"
DOMINO_TOKEN_FILE = "DOMINO_TOKEN_FILE"
DOMINO_ENABLE_RESUME = "DOMINO_ENABLE_RESUME"


def __getattr__(name: str) -> Any:
Expand All @@ -45,6 +47,14 @@ class UnauthenticatedError(DominoError):
"""To handle exponential backoff."""


class DownloadError(DominoError):
"""Error during download."""

def __init__(self, message: str, completed_bytes: int = 0):
super().__init__(message)
self.completed_bytes = completed_bytes


@attr.s
class _File:
"""Represents a file in a dataset."""
Expand Down Expand Up @@ -90,20 +100,46 @@ def download_file(self, filename: str) -> None:
content_size += len(data)
file.write(data)

def download(self, filename: str, max_workers: int = MAX_WORKERS) -> None:
"""Download object content to file with multithreaded support.
def download(
self,
filename: str,
max_workers: int = MAX_WORKERS,
chunk_size: int = DEFAULT_CHUNK_SIZE,
resume: bool = None,
) -> None:
"""Download object content to file with multithreaded and resumable support.

The file will be created if it does not exist. File will be overwritten if it exists.
The file will be created if it does not exist. File will be overwritten if it exists
and resume is False.

Args:
filename: path of file to write content to
max_workers: max parallelism for high speed download
chunk_size: size of each chunk to download in bytes
resume: whether to enable resumable downloads (overrides env var if provided)
"""
url = self.dataset.get_file_url(self.name)
headers = self._get_headers()

# Determine if resumable downloads are enabled
if resume is None:
resume = os.environ.get(DOMINO_ENABLE_RESUME, "").lower() in ("true", "1", "yes")

# Create a unique identifier for this download (for the resume state file)
# Using usedforsecurity=False as this is not used for security purposes
url_hash = hashlib.md5(url.encode(), usedforsecurity=False).hexdigest()
resume_state_file = get_resume_state_path(filename, url_hash) if resume else None

with open(filename, "wb") as file:
BlobTransfer(
url, file, headers=headers, max_workers=max_workers, http=self.pool_manager()
url,
file,
headers=headers,
max_workers=max_workers,
http=self.pool_manager(),
chunk_size=chunk_size,
resume_state_file=resume_state_file,
resume=resume,
)

def download_fileobj(self, fileobj: Any) -> None:
Expand Down Expand Up @@ -145,6 +181,28 @@ def _get_headers(self) -> dict:

return headers

def download_with_ranges(
self,
filename: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
max_workers: int = MAX_WORKERS,
resume: bool = None,
) -> None:
"""Download a file using range requests with resumable support.

Args:
filename: Path to save the file to
chunk_size: Size of chunks to download
max_workers: Maximum number of parallel downloads
resume: Whether to attempt to resume a previous download

Returns:
None
"""
return self.download(
filename, max_workers=max_workers, chunk_size=chunk_size, resume=resume
)


@attr.s
class Dataset:
Expand Down Expand Up @@ -215,18 +273,25 @@ def download_file(self, dataset_file_name: str, local_file_name: str) -> None:
self.File(dataset_file_name).download_file(local_file_name)

def download(
self, dataset_file_name: str, local_file_name: str, max_workers: int = MAX_WORKERS
self,
dataset_file_name: str,
local_file_name: str,
max_workers: int = MAX_WORKERS,
chunk_size: int = DEFAULT_CHUNK_SIZE,
resume: bool = None,
) -> None:
"""Download file content to file located at filename.
"""Download file content to file located at filename with resumable support.

The file will be created if it does not exist.

Args:
dataset_file_name: name of the file in the dataset to download.
local_file_name: path of file to write content to
max_workers: max parallelism for high speed download
chunk_size: size of each chunk to download in bytes
resume: whether to enable resumable downloads (overrides env var if provided)
"""
self.File(dataset_file_name).download(local_file_name, max_workers)
self.File(dataset_file_name).download(local_file_name, max_workers, chunk_size, resume)

def download_fileobj(self, dataset_file_name: str, fileobj: Any) -> None:
"""Download file contents to file like object.
Expand All @@ -238,6 +303,25 @@ def download_fileobj(self, dataset_file_name: str, fileobj: Any) -> None:
"""
self.File(dataset_file_name).download_fileobj(fileobj)

def download_with_ranges(
self,
dataset_file_name: str,
local_file_name: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
max_workers: int = MAX_WORKERS,
resume: bool = None,
) -> None:
"""Download a file using range requests with resumable support.

Args:
dataset_file_name: Name of the file in the dataset
local_file_name: Path to save the file to
chunk_size: Size of chunks to download
max_workers: Maximum number of parallel downloads
resume: Whether to attempt to resume a previous download
"""
self.download(dataset_file_name, local_file_name, max_workers, chunk_size, resume)


@attr.s
class DatasetClient:
Expand Down
Loading