Skip to content

Commit 70bbf17

Browse files
authored
S3 Stream Writer: simpler and more robust logic (#136)
* S3 Stream Writer: simpler and more robust logic * do not consider context cause for now * slightly better comments * remove unused import * fix typo in comment * explicit field assign for writeWrapper in NewWriter * do not overwrite prevErr in Write * check prevErr before trying to write * simpler * avoid passing a ctx * actually, the ctx will come handy * pass references to goroutine without params
1 parent 0d4562f commit 70bbf17

1 file changed

Lines changed: 22 additions & 43 deletions

File tree

backends/s3/stream.go

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ package s3
33
import (
44
"context"
55
"io"
6-
7-
"github.com/PowerDNS/simpleblob"
8-
"github.com/minio/minio-go/v7"
96
)
107

118
// NewReader satisfies StreamReader and provides a read streaming interface to
@@ -22,60 +19,42 @@ func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, er
2219
// NewWriter satisfies StreamWriter and provides a write streaming interface to
2320
// a blob located on an S3 server.
2421
func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, error) {
25-
if err := ctx.Err(); err != nil {
26-
return nil, err
27-
}
22+
ctx, cancel := context.WithCancel(ctx)
2823
name = b.prependGlobalPrefix(name)
2924
pr, pw := io.Pipe()
30-
w := &writerWrapper{
31-
ctx: ctx,
32-
backend: b,
33-
name: name,
34-
pw: pw,
35-
donePipe: make(chan struct{}),
36-
}
3725
go func() {
38-
var err error
39-
// The following call will return only on error or
40-
// if the writing end of the pipe is closed.
41-
// It is okay to write to w.info from this goroutine
42-
// because it will only be used after w.donePipe is closed.
43-
w.info, err = w.backend.doStoreReader(w.ctx, w.name, pr, -1)
44-
_ = pr.CloseWithError(err) // Always returns nil.
45-
close(w.donePipe)
26+
// This call returns when the pipe is closed, or when an error occurs.
27+
info, err := b.doStoreReader(ctx, name, pr, -1)
28+
if err == nil {
29+
_ = b.setMarker(ctx, name, info.ETag, false)
30+
}
31+
_ = pr.CloseWithError(err)
32+
cancel()
4633
}()
47-
return w, nil
34+
return &writerWrapper{ctx: ctx, pw: pw}, nil
4835
}
4936

50-
// A writerWrapper implements io.WriteCloser and is returned by (*Backend).NewWriter.
37+
// A writerWrapper allows storing data on S3 through a io.WriteCloser.
5138
type writerWrapper struct {
52-
backend *Backend
53-
54-
// We need to keep these around
55-
// to write the marker in Close.
56-
ctx context.Context
57-
info minio.UploadInfo
58-
name string
59-
60-
// Writes are sent to this pipe
61-
// and then written to S3 in a background goroutine.
62-
pw *io.PipeWriter
63-
donePipe chan struct{}
39+
ctx context.Context
40+
pw *io.PipeWriter
6441
}
6542

43+
// Write sends p to store as the S3 object associated with w.
44+
// An error is returned if Write failed previously, an error occurred in S3, or w is already closed.
6645
func (w *writerWrapper) Write(p []byte) (int, error) {
6746
// Not checking the status of ctx explicitly because it will be propagated
6847
// from the reader goroutine.
6948
return w.pw.Write(p)
7049
}
7150

51+
// Close ensures that the written data is saved.
52+
// An error is returned if Write failed previously, an error occurred in S3, or w is already closed.
7253
func (w *writerWrapper) Close() error {
73-
select {
74-
case <-w.donePipe:
75-
return simpleblob.ErrClosed
76-
default:
77-
}
78-
_ = w.pw.Close() // Always returns nil.
79-
<-w.donePipe // Wait for doStoreReader to return and w.info to be set.
80-
return w.backend.setMarker(w.ctx, w.name, w.info.ETag, false)
54+
_, err := w.pw.Write(nil)
55+
_ = w.pw.Close()
56+
// Let the reading goroutine finish writing,
57+
// and write the marker if needed.
58+
<-w.ctx.Done() // cancelled after writing the marker
59+
return err
8160
}

0 commit comments

Comments
 (0)