Skip to content

Provide io.Reader wrapper around S3 SelectObjectContentEventStream #2618

Closed as not planned
@corey-cole

Description

Describe the feature

Within the SDK, offer an io.Reader wrapper around SelectObjectContentEventStream. It doesn't have to do everything that SelectObjectContentEventStream does (i.e. metrics) as long as the differences are documented.

Use Case

When working with S3 Select, I frequently want to pass the output to functions that want a Reader (e.g. io.Copy). One specific AWS use case is to stream results of S3 Select back into S3 via s3manager where UploadInput accepts an io.Reader for the Body.

Proposed Solution

I could see this being implemented in the s3manager package as it's somewhat related. This is a GenAI implementation and I could imagine that there are error scenarios not handled here.

type s3SelectReader struct {
	stream    *s3.SelectObjectContentEventStream
	remaining []byte // Buffer to store leftover data from previous event
	closed    bool   // Flag indicating whether the reader has been closed
}

func (r *s3SelectReader) Read(b []byte) (n int, err error) {
	// If the reader has been closed, return immediately with an error.
	if r.closed {
		return 0, io.ErrClosedPipe
	}
	var totalBytesRead int
	for {
		// If there is data remaining from the previous event, copy it to the output slice.
		if len(r.remaining) > 0 {
			n := copy(b[totalBytesRead:], r.remaining)
			totalBytesRead += n
			r.remaining = r.remaining[n:]
			if totalBytesRead == len(b) {
				return totalBytesRead, nil
			}
		}

		data, ok := <-r.stream.Events()
		if !ok {
			r.closed = true
			if totalBytesRead > 0 {
				return totalBytesRead, nil
			}
			return 0, io.EOF
		}
		switch v := data.(type) {
		case *s3types.SelectObjectContentEventStreamMemberRecords:
			n := copy(b[totalBytesRead:], v.Value.Payload)
			totalBytesRead += n
			if n < len(v.Value.Payload) {
				r.remaining = v.Value.Payload[n:]
			}
			if totalBytesRead == len(b) {
				return totalBytesRead, nil
			}
		case *s3types.SelectObjectContentEventStreamMemberEnd:
                        //Update 4/25:  Premature optimization to mark the stream as closed as soon as the event stream reaches an end.
			//r.closed = true
			if totalBytesRead > 0 {
				return totalBytesRead, nil
			}
			return 0, io.EOF
		default:
		}
	}
}

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

AWS Go SDK V2 Module Versions Used

github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.11 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.2 // indirect

Go version used

go version go1.22.2 darwin/arm64

Metadata

Assignees

No one assigned

    Labels

    feature-requestA feature should be added or improved.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions