Skip to content

Support parallel multipart upload of non-seekable fileobj #366

@athewsey

Description

@athewsey

Hi folks,

I'm trying to stream large (~4GB) files from an external API to S3, without persisting them to local disk - by wrapping my streaming requests.Response in a file-like object to use with boto3 upload_fileobj (full code example below).

Unfortunately as far as I can tell, there's no way to specify the input size of a non-seekable file-like object to S3Transfer - so S3Transfer doesn't realize the file is large and should be handled via concurrent multipart upload:

As a result the upload is much slower and more prone to failure than the equivalent local fileobj (and wouldn't work at all if the file was over 5GB).

Could we add some option for specifying the length of non-seekable files for upload, so they can be treated correctly?

Solution options

In my use-case I do know the length of the content (from the API response Content-Length header), and already have full control over the file-like object because the HTTP response needs to be wrapped anyway (something like this)... So could either:

  1. Pass in the length as an explicit argument when creating the upload, or
  2. Expose it on the file object somehow e.g. by implementing __len__ or some other API.
    • ...But as far as I can tell (e.g. from Python io docs), there isn't really a standard convention for indicating size/length on a file-like object? So maybe option 1 would be better?

User code example

import boto3
import requests

s3 = boto3.client("s3")

class ResponseStream:
    """One-time readable file-like object wrapper for a streaming HTTP requests.Response

    Does not support seek()ing locations or multiple read-throughs.

    Example usage:

        with requests.get(url, stream=True, ...) as r:
            r.raise_for_status()
            json.load(ResponseStream(r))
    """
    def __init__(self, response: requests.Response, read_chunk_size: int = 4096):
        self._pending_bytes = None
        self._iterator = response.iter_content(read_chunk_size)

    def _next_response_chunk(self) -> bytes:
        """Get the next chunk of the HTTP response, or an empty bytes() if it's finished"""
        try:
            return next(self._iterator)
        except StopIteration:
            return bytes()

    def read(self, size: int | None = None) -> bytes:
        if size is None or size <= 0:
            # No output size specified - return leftovers if any else next input chunk:
            if self._pending_bytes:
                result = self._pending_bytes
                self._pending_bytes = None
                return result
            else:
                return self._next_response_chunk()
        # Otherwise try to read requested amount of data:
        result = self._pending_bytes or bytes()
        while len(result) < size:
            chunk = self._next_response_chunk()
            result += chunk
            if not chunk:
                # End of input stream
                break
        if len(result) > size:
            self._pending_bytes = result[size:]
            result = result[:size]
        else:
            self._pending_bytes = None
        return result

    def close(self):
        pass


def stream_file_to_s3(url, filename, bucket, s3_key):
    """Stream a single file from a URL to S3, without saving locally."""
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        print(f"Uploading {filename} to s3://{bucket}/{s3_key} ...")
        s3.upload_fileobj(ResponseStream(r), bucket, s3_key)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions