Skip to content

Commit 0eee1f9

Browse files
authored
fix(storage): fix append edge cases (#12074)
* fix(storage): fix append edge cases Fix the validation for generation being included. Fix a case where the error was not being picked up during append takeover. Add an integration test for these issues * fix flush hang * add another takeover case
1 parent 8e37155 commit 0eee1f9

File tree

3 files changed

+105
-13
lines changed

3 files changed

+105
-13
lines changed

storage/grpc_writer.go

+23-12
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,17 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
127127
bctx = bucketContext(bctx, gw.bucket)
128128
}
129129
err = run(bctx, uploadBuff, gw.settings.retry, s.idempotent)
130+
offset += int64(recvd)
131+
// If this buffer upload was triggered by a flush, reset and
132+
// communicate back the result.
133+
if gw.flushInProgress {
134+
gw.setSize(offset)
135+
gw.flushInProgress = false
136+
gw.flushComplete <- flushResult{offset: offset, err: err}
137+
}
130138
if err != nil {
131139
return err
132140
}
133-
offset += int64(recvd)
134-
135141
// When we are done reading data without errors, set the object and
136142
// finish.
137143
if doneReading {
@@ -213,7 +219,7 @@ func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams,
213219
append: params.append,
214220
finalizeOnClose: params.finalizeOnClose,
215221
setPipeWriter: setPipeWriter,
216-
flushComplete: make(chan int64),
222+
flushComplete: make(chan flushResult),
217223
}, nil
218224
}
219225

@@ -245,8 +251,13 @@ type gRPCWriter struct {
245251
finalizeOnClose bool
246252

247253
streamSender gRPCBidiWriteBufferSender
248-
flushInProgress bool // true when the pipe is being recreated for a flush.
249-
flushComplete chan int64 // use to signal back to flush call that flush to server was completed.
254+
flushInProgress bool // true when the pipe is being recreated for a flush.
255+
flushComplete chan flushResult // use to signal back to flush call that flush to server was completed.
256+
}
257+
258+
type flushResult struct {
259+
err error
260+
offset int64
250261
}
251262

252263
func bucketContext(ctx context.Context, bucket string) context.Context {
@@ -554,11 +565,6 @@ func (w *gRPCWriter) uploadBuffer(ctx context.Context, recvd int, start int64, d
554565
break
555566
}
556567
}
557-
if w.flushInProgress {
558-
w.setSize(offset)
559-
w.flushInProgress = false
560-
w.flushComplete <- offset
561-
}
562568
return
563569
}
564570

@@ -604,8 +610,8 @@ func (w *gRPCWriter) flush() (int64, error) {
604610
w.pw.Close()
605611

606612
// Wait for flush to complete
607-
offset := <-w.flushComplete
608-
return offset, nil
613+
result := <-w.flushComplete
614+
return result.offset, result.err
609615
}
610616

611617
func checkCanceled(err error) error {
@@ -686,6 +692,11 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender(ctx context.Context)
686692
return nil, err
687693
}
688694
firstResp := <-s.recvs
695+
// Check recvErr after getting the response.
696+
if s.recvErr != nil {
697+
return nil, s.recvErr
698+
}
699+
689700
// Object resource is returned in the first response on takeover, so capture
690701
// this now.
691702
s.objResource = firstResp.GetResource()

storage/integration_test.go

+81
Original file line numberDiff line numberDiff line change
@@ -3524,6 +3524,87 @@ func TestIntegration_WriterAppendTakeover(t *testing.T) {
35243524
})
35253525
}
35263526

3527+
func TestIntegration_WriterAppendEdgeCases(t *testing.T) {
3528+
t.Skip("b/402283880")
3529+
ctx := skipAllButBidi(context.Background(), "ZB test")
3530+
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _, prefix string, client *Client) {
3531+
h := testHelper{t}
3532+
bucketName := prefix + uidSpace.New()
3533+
bkt := client.Bucket(bucketName)
3534+
3535+
h.mustCreateZonalBucket(bkt, testutil.ProjID())
3536+
defer h.mustDeleteBucket(bkt)
3537+
3538+
objName := "object1"
3539+
obj := bkt.Object(objName)
3540+
defer h.mustDeleteObject(obj)
3541+
3542+
// Takeover Writer to a non-existent object should fail, with or
3543+
// without generation specified.
3544+
if _, _, err := obj.NewWriterFromAppendableObject(ctx, &AppendableWriterOpts{}); err == nil {
3545+
t.Errorf("NewWriterFromAppendableObject: got nil, want error")
3546+
}
3547+
_, _, err := obj.Generation(1234).NewWriterFromAppendableObject(ctx, &AppendableWriterOpts{})
3548+
if status.Code(err) != codes.NotFound {
3549+
t.Errorf("NewWriterFromAppendableObject: got %v, want NotFound", err)
3550+
}
3551+
3552+
// If a takeover is opened, flush or close to the original writer
3553+
// should fail.
3554+
w := obj.NewWriter(ctx)
3555+
w.Append = true
3556+
w.ChunkSize = MiB
3557+
if _, err := w.Write(randomBytes3MiB); err != nil {
3558+
t.Fatalf("w.Write: %v", err)
3559+
}
3560+
3561+
tw, _, err := obj.Generation(w.Attrs().Generation).NewWriterFromAppendableObject(ctx, nil)
3562+
if err != nil {
3563+
t.Fatalf("NewWriterFromAppendableObject: %v", err)
3564+
}
3565+
if _, err := tw.Write([]byte("hello world")); err != nil {
3566+
t.Fatalf("tw.Write: %v", err)
3567+
}
3568+
if _, err := tw.Flush(); err != nil {
3569+
t.Fatalf("tw.Flush: %v", err)
3570+
}
3571+
3572+
// Expect precondition error when writer to orginal Writer.
3573+
if _, err := w.Write(randomBytes3MiB); status.Code(err) != codes.FailedPrecondition {
3574+
t.Fatalf("got %v", err)
3575+
}
3576+
3577+
// Another NewWriter to the unfinalized object should also return a
3578+
// precondition error when data is flushed.
3579+
w2 := obj.NewWriter(ctx)
3580+
w2.Append = true
3581+
if _, err := w2.Write([]byte("hello world")); err != nil {
3582+
t.Fatalf("w2.Write: %v", err)
3583+
}
3584+
if _, err := w2.Flush(); status.Code(err) != codes.FailedPrecondition {
3585+
t.Fatalf("w2.Flush: %v", err)
3586+
}
3587+
3588+
// If we add yet another takeover writer to finalize and delete the object,
3589+
// tw should also return an error on flush.
3590+
tw2, _, err := obj.Generation(w.Attrs().Generation).NewWriterFromAppendableObject(ctx, &AppendableWriterOpts{
3591+
FinalizeOnClose: true,
3592+
})
3593+
if err != nil {
3594+
t.Fatalf("NewWriterFromAppendableObject: %v", err)
3595+
}
3596+
if err := tw2.Close(); err != nil {
3597+
t.Fatalf("tw2.Close: %v", err)
3598+
}
3599+
h.mustDeleteObject(obj)
3600+
if _, err := tw.Write([]byte("abcde")); err != nil {
3601+
t.Fatalf("tw.Write: %v", err)
3602+
}
3603+
if _, err := tw.Flush(); status.Code(err) != codes.FailedPrecondition {
3604+
t.Errorf("tw.Flush: got %v, want FailedPrecondition", err)
3605+
}
3606+
})
3607+
}
35273608
func TestIntegration_ZeroSizedObject(t *testing.T) {
35283609
t.Parallel()
35293610
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket, _ string, client *Client) {

storage/storage.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1267,7 +1267,7 @@ func (o *ObjectHandle) NewWriter(ctx context.Context) *Writer {
12671267
// This feature is in preview and is not yet available for general use.
12681268
func (o *ObjectHandle) NewWriterFromAppendableObject(ctx context.Context, opts *AppendableWriterOpts) (*Writer, int64, error) {
12691269
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.Writer")
1270-
if o.gen == 0 {
1270+
if o.gen < 0 {
12711271
return nil, 0, errors.New("storage: ObjectHandle.Generation must be set to use NewWriterFromAppendableObject")
12721272
}
12731273
w := &Writer{

0 commit comments

Comments
 (0)