Skip to content

feat(storage): add support for streaming uploads#219

Open
brianquinlan wants to merge 17 commits intogoogleapis:mainfrom
brianquinlan:sink_upload
Open

feat(storage): add support for streaming uploads#219
brianquinlan wants to merge 17 commits intogoogleapis:mainfrom
brianquinlan:sink_upload

Conversation

@brianquinlan
Copy link
Copy Markdown
Contributor

@brianquinlan brianquinlan commented Mar 26, 2026

Fixes #180

Supports uploading GCS objects using a StreamSink<List<int>>.

This PR trades performance and correctness for simplicity. Future PRs (see the TODO in the code) will fix the correctness problems.

Calls to add accumulate data in memory until close is called, which causes the data to be sent to GCS.
Calls to addStream accumulate data into 256KiB chunks (the minimum GCS chunk size) and then sends that data to GCS.

It is possible for GCS to partially reject a write. For example, you could send 100MiB to GCS in a single HTTP request and GCS could indicate that 50MiB was successfully stored. That scenario is not currently supported and the upload will fail in that case.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the ResumableUploadSink class and a corresponding uploadObjectFromSink method in the Storage client to support resumable uploads to Google Cloud Storage using a StreamSink. The implementation includes internal buffering to meet GCS chunk size requirements and is accompanied by a comprehensive test suite. Review feedback highlights a significant performance improvement opportunity in the _flush method to reduce memory allocations and copies. Other suggestions focus on improving the clarity of error messages when concurrent operations are attempted, ensuring consistent use of StateError, and correcting a misleading test name.

brianquinlan and others added 6 commits March 26, 2026 16:03
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@brianquinlan
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the ResumableUploadSink class and the uploadObjectFromSink method to support resumable uploads to Google Cloud Storage using a StreamSink. While the implementation provides a solid foundation, several improvements are needed for production readiness. Key feedback includes addressing a critical bug where the upload offset is updated optimistically without verifying the server's received byte range, optimizing buffer management to reduce memory allocations and copies, and updating method signatures to return Future for better interface compliance. Additionally, the documentation should be updated to warn users about potential memory issues when using add() for large uploads.

Comment on lines +103 to +105
// TODO(https://github.com/googleapis/google-cloud-dart/issues/218):
// Check the "range" headers to determine if any data must be resent.
_nextExpectedByte = newEnd;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation optimistically updates _nextExpectedByte without verifying the actual number of bytes received by the server. In a resumable upload, it's possible for the server to receive only a partial chunk.

The 308 Resume Incomplete response from Google Cloud Storage includes a Range header (e.g., Range: bytes=0-262143) that specifies which bytes have been successfully stored. You must parse this header to determine the correct next byte offset.

Failing to do so can lead to data corruption if a chunk is only partially uploaded. While there's a TODO for this, I'm flagging it as critical because it affects the core correctness of the upload functionality.

Comment on lines +794 to +801
/// Creates or updates the content of a [Google Cloud Storage object][] using
/// a [StreamSink].
///
/// If [metadata] is non-null, it will be used as the object's metadata. If
/// [metadata] is `null` or `metadata.contentType` is `null`, the content type
/// will be `'application/octet-stream'`. If `metadata.name` does not
/// match [name], a [BadRequestException] is thrown.
///
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The documentation should clarify the behavior of the returned StreamSink. Specifically, calls to sink.add() will buffer all data in memory until sink.close() is invoked. This can lead to excessive memory usage and potentially an OutOfMemoryError when uploading large files.

Please add a note advising users to prefer sink.addStream() for large uploads to take advantage of chunked streaming and avoid high memory consumption.

For example, you could add:

/// **Important**: When using the returned [ResumableUploadSink], be aware that
/// calls to `sink.add()` will buffer all data in memory until `sink.close()` is
/// called. For large uploads, this can lead to high memory usage. It is
/// recommended to use `sink.addStream()` for large files to stream data in
/// chunks and avoid buffering the entire content in memory.

Comment on lines +64 to +74
if (requiredCapacity > _writeBuffer.length) {
var newSize = _writeBuffer.length * 2;
if (newSize < requiredCapacity) {
newSize = requiredCapacity;
}
newSize =
((newSize + _minWriteSize - 1) ~/ _minWriteSize) * _minWriteSize;

final newBuffer = Uint8List(newSize)
..setRange(0, _writeBufferSize, _writeBuffer);
_writeBuffer = newBuffer;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current buffer resizing logic in _addToBuffer creates a new Uint8List and copies all existing data whenever the buffer capacity is exceeded. This can be inefficient for performance, especially when dealing with many small add() calls.

For a more performant implementation, consider using BytesBuilder. It is designed for this use case and can often avoid reallocating and copying the entire buffer on each growth by managing an internal list of chunks.

final flushPoint = _largestWriteSize(_writeBufferSize);
if (flushPoint == 0) return;
final flushBuffer = _writeBuffer;
_writeBuffer = _writeBuffer.sublist(flushPoint);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using _writeBuffer.sublist(flushPoint) creates a new Uint8List and copies the remaining data after a flush. This copy operation can be a performance bottleneck, especially if the remaining part of the buffer is large.

A more efficient approach would be to avoid creating a new list. You could, for example, move the remaining data to the beginning of the same buffer using _writeBuffer.setRange(0, _writeBufferSize - flushPoint, _writeBuffer, flushPoint) and then update the size. This would avoid an allocation and a copy.

}

@override
Future<dynamic> addStream(Stream<List<int>> stream) async {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The addStream method is defined to return Future<dynamic>. According to the StreamSink interface, this method should return Future<void>.

Using Future<void> improves type safety and adheres to the interface contract. Please update the return type.

Suggested change
Future<dynamic> addStream(Stream<List<int>> stream) async {
Future<void> addStream(Stream<List<int>> stream) async {

}

@override
Future<dynamic> close() async {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The close method is defined to return Future<dynamic>. According to the StreamSink interface, this method should return Future<void>.

Using Future<void> improves type safety and adheres to the interface contract. Please update the return type.

Suggested change
Future<dynamic> close() async {
Future<void> close() async {

/// See [API reference docs](https://docs.cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload).
///
/// [Google Cloud Storage object]: https://docs.cloud.google.com/storage/docs/json_api/v1/objects
ResumableUploadSink uploadObjectFromSink(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] I don't think we need to expose the ResumableUploadSink type, I don't see any API surface it adds.

Suggested change
ResumableUploadSink uploadObjectFromSink(
StreamSink<List<int>> uploadObjectFromSink(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modeling ResumableUploadSink after the C++ ObjectWriteStream class.

Eventually, it will support the properties required to resume an existing upload e.g. https://github.com/googleapis/google-cloud-cpp/blob/fdbec33ff40b1d6ae9b740cbacf969be4141255a/google/cloud/storage/object_write_stream.h#L273

export 'src/object_metadata.dart';
export 'src/object_metadata_patch_builder.dart' show ObjectMetadataPatchBuilder;
export 'src/project_team.dart';
export 'src/resumeable_upload.dart' show ResumableUploadSink;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to export this?

Suggested change
export 'src/resumeable_upload.dart' show ResumableUploadSink;

Comment on lines +35 to +36
final small = randomUint8List(100);
final large = randomUint8List(5_000_000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the seed argument for consistent test data?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think I should? I hope that it doesn't matter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there does end up being a failure for some sets of inputs having an unknown seed can make reproductions harder.

On the other hand, using a fixed seed loses some of the fuzzing we get otherwise and we don't currently have a nice API for using changing but reproducible random seeds from the test runner. If we had dart-lang/test#1572 I'd argue for using it. I just checked and printOnFailure doesn't work in setUpAll which is the other approach I wanted to suggest dart-lang/test#2620

I'm happy to stick with a random seed for now and we can revisit if it ever becomes a problem or we have a nice universally recommended pattern.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was doing this for fuzzing purposes. dart-lang/test#1572 is a good idea!

/// See [API reference docs](https://docs.cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload).
///
/// [Google Cloud Storage object]: https://docs.cloud.google.com/storage/docs/json_api/v1/objects
ResumableUploadSink uploadObjectFromSink(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take a Stream argument instead of returning a StreamSink? It would let us return a Future<ObjectMetadata> like the other method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I want to leave the existing interface to support resumable uploads. But adding uploadObjectFromStream seems like a good addition (and trivial to implement).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could still support resumable uploads. I might be overlooking some detail, but I think returning a StreamSink and taking a Stream argument are interdefinable APIs.

If we implement as uploadObjectFromSink as you have here, then we can define

Future<void> uploadObjectFromStream(
  String bucket,
  String name,
  Stream<List<int>> stream, {
  ObjectMetadata? metadata,
}) async {
  final sink = uploadObjectFromSink(bucket, name, metadata: metadata);
  try {
    await sink.addStream(stream);
  } finally {
    await sink.close();
  }
}

Alternatively if we implement as uploadObjectFromStream then the current API can wrap it.

StreamSink<List<int>> uploadObjectFromSink(
  String bucket,
  String name, {
  ObjectMetadata? metadata,
}) {
  final controller = StreamController<List<int>>();
  uploadObjectFromStream(
    bucket, 
    name, 
    controller.stream, 
    metadata: metadata
  ).ignore();
  return controller.sink;
}

In either case the underlying sink can use the resumable implementation so I think we only need to pick which signature we like the best. If the resumable sink has extra members of StreamSink they wouldn't be equivalent, but I don't recall it defining anything public that wasn't an override.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that the resumable sink will get new public members.

Having both APIs SGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

add streaming uploads

2 participants