Skip to content

Commit 1e8293e

Browse files
committed
fix(simple reader): Improving retry logic for kernel reader path (#4347)
Changes to improve retry logic for kernel reader.
1 parent 51c09fd commit 1e8293e

File tree

4 files changed

+31
-81
lines changed

4 files changed

+31
-81
lines changed

internal/gcsx/mrd_kernel_reader.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21-
"io"
2221
"sync/atomic"
2322

2423
"github.com/googlecloudplatform/gcsfuse/v3/internal/logger"
@@ -45,18 +44,18 @@ func NewMrdKernelReader(mrdInstance *MrdInstance, metricsHandle metrics.MetricHa
4544
}
4645
}
4746

48-
// isShortRead checks if the read operation returned fewer bytes than requested
49-
// without encountering a fatal error.
50-
// It returns true if bytesRead < bufferSize and err is either nil, io.EOF, io.ErrUnexpectedEOF,
51-
// or a gRPC OutOfRange error.
47+
// isShortRead determines what constitutes a short read for retry purposes.
48+
// It returns true if bytesRead < bufferSize and the error is a gRPC OutOfRange error.
5249
func isShortRead(bytesRead int, bufferSize int, err error) bool {
5350
if bytesRead >= bufferSize {
5451
return false
5552
}
5653

57-
if err == nil || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
58-
return true
59-
}
54+
// Even without O_DIRECT, OutOfRange errors can occur during appends from the same mount.
55+
// The kernel tracks the updated file size and allows reads, but the active MRD connection might
56+
// still reference the old object size. We update the local object in the MrdInstance without
57+
// recreating the MRD connection. Reads beyond the old size thus return OutOfRange, which
58+
// we handle as a short read to trigger MRD recreation and retry.
6059

6160
// Check for gRPC OutOfRange error, handling wrapped errors.
6261
var se interface{ GRPCStatus() *status.Status }

internal/gcsx/mrd_kernel_reader_test.go

Lines changed: 23 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"context"
2020
"errors"
2121
"fmt"
22-
"io"
2322
"os"
2423
"testing"
2524
"time"
@@ -148,60 +147,31 @@ func (t *MrdKernelReaderTest) TestReadAt_NilMrdInstance() {
148147
assert.Equal(t.T(), 0, resp.Size)
149148
}
150149

151-
func (t *MrdKernelReaderTest) TestReadAt_ShortRead_RetrySuccess() {
150+
func (t *MrdKernelReaderTest) TestReadAt_ShortRead_NoRetry() {
152151
data := []byte("hello world")
153-
// First MRD returns short read.
154-
fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithShortRead(t.object, data)
155-
// Second MRD returns full read.
156-
fakeMRD2 := fake.NewFakeMultiRangeDownloader(t.object, data)
152+
// MRD returns short read.
153+
fakeMRD := fake.NewFakeMultiRangeDownloaderWithShortRead(t.object, data)
157154
// Expectation:
158-
// 1. Initial Read calls ensureMRDPool -> NewMRDPool -> NewMultiRangeDownloader. Returns fakeMRD1.
155+
// 1. Read calls ensureMRDPool -> NewMRDPool -> NewMultiRangeDownloader. Returns fakeMRD.
159156
// 2. Read returns short read.
160-
// 3. ReadAt calls RecreateMRD -> NewMRDPool -> NewMultiRangeDownloader. Returns fakeMRD2.
161-
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once()
162-
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once()
157+
// 3. isShortRead returns false because err is nil.
158+
// 4. ReadAt returns the short read without retrying.
159+
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once()
163160
buf := make([]byte, len(data))
164-
req := &ReadRequest{
165-
Buffer: buf,
166-
Offset: 0,
167-
}
161+
req := &ReadRequest{Buffer: buf, Offset: 0}
168162

169163
resp, err := t.reader.ReadAt(context.Background(), req)
170164

171165
assert.NoError(t.T(), err)
172-
assert.Equal(t.T(), len(data), resp.Size)
173-
assert.Equal(t.T(), string(data), string(buf))
174-
// Verify refCount incremented
166+
assert.Equal(t.T(), 5, resp.Size) // Short read size
167+
assert.Equal(t.T(), "hello", string(buf[:5]))
168+
// Verify refCount incremented (only once)
175169
t.mrdInstance.refCountMu.Lock()
176170
assert.Equal(t.T(), int64(1), t.mrdInstance.refCount)
177171
t.mrdInstance.refCountMu.Unlock()
178172
t.bucket.AssertExpectations(t.T())
179173
}
180174

181-
func (t *MrdKernelReaderTest) TestReadAt_ShortRead_RetryFails() {
182-
data := []byte("hello world")
183-
// First MRD returns short read with io.EOF.
184-
fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithShortRead(t.object, data)
185-
// Second MRD returns 0 bytes and an error.
186-
retryErr := status.Error(codes.Internal, "Internal error")
187-
fakeMRD2 := fake.NewFakeMultiRangeDownloaderWithSleepAndDefaultError(t.object, []byte{}, 0, retryErr)
188-
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once()
189-
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once()
190-
buf := make([]byte, len(data))
191-
req := &ReadRequest{
192-
Buffer: buf,
193-
Offset: 0,
194-
}
195-
196-
resp, err := t.reader.ReadAt(context.Background(), req)
197-
198-
// ReadAt returns the error from the last attempt (the retry).
199-
assert.ErrorIs(t.T(), err, retryErr)
200-
assert.Equal(t.T(), 5, resp.Size)
201-
assert.Equal(t.T(), "hello", string(buf[:5]))
202-
t.bucket.AssertExpectations(t.T())
203-
}
204-
205175
func (t *MrdKernelReaderTest) TestReadAt_OutOfRange_TriggersRetry() {
206176
data := []byte("hello world")
207177
// First MRD returns OutOfRange error.
@@ -274,32 +244,18 @@ func TestIsShortRead(t *testing.T) {
274244
expected: false,
275245
},
276246
{
277-
name: "Full read, EOF",
247+
name: "Full read, error",
278248
bytesRead: 10,
279249
bufferSize: 10,
280-
err: io.EOF,
250+
err: errors.New("error"),
281251
expected: false,
282252
},
283253
{
284254
name: "Short read, no error",
285255
bytesRead: 5,
286256
bufferSize: 10,
287257
err: nil,
288-
expected: true,
289-
},
290-
{
291-
name: "Short read, EOF",
292-
bytesRead: 5,
293-
bufferSize: 10,
294-
err: io.EOF,
295-
expected: true,
296-
},
297-
{
298-
name: "Short read, UnexpectedEOF",
299-
bytesRead: 5,
300-
bufferSize: 10,
301-
err: io.ErrUnexpectedEOF,
302-
expected: true,
258+
expected: false,
303259
},
304260
{
305261
name: "Short read, OutOfRange",
@@ -356,17 +312,17 @@ func (t *MrdKernelReaderTest) TestDestroy() {
356312
}
357313

358314
func (t *MrdKernelReaderTest) TestReadAt_RecreateMRDFails_RetriesWithOldMRD() {
359-
data := []byte("hello world")
360-
// First MRD returns short read.
361-
fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithShortRead(t.object, data)
315+
// First MRD returns OutOfRange error.
316+
outOfRangeErr := status.Error(codes.OutOfRange, "Out of range")
317+
fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithSleepAndDefaultError(t.object, []byte{}, 0, outOfRangeErr)
362318
// Expectation:
363319
// 1. Initial Read calls ensureMRDPool -> NewMRDPool -> NewMultiRangeDownloader. Returns fakeMRD1.
364-
// 2. Read returns short read.
320+
// 2. Read returns OutOfRange.
365321
// 3. ReadAt calls RecreateMRD -> NewMRDPool -> NewMultiRangeDownloader. Returns ERROR.
366322
// 4. ReadAt logs warning and retries with existing pool (fakeMRD1).
367323
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once()
368324
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(nil, errors.New("recreate failed")).Once()
369-
buf := make([]byte, len(data))
325+
buf := make([]byte, 10)
370326
req := &ReadRequest{
371327
Buffer: buf,
372328
Offset: 0,
@@ -376,11 +332,10 @@ func (t *MrdKernelReaderTest) TestReadAt_RecreateMRDFails_RetriesWithOldMRD() {
376332

377333
// Assert
378334
// We verify that we didn't get the "recreate failed" error.
379-
if err != nil {
380-
assert.NotEqual(t.T(), "recreate failed", err.Error())
381-
}
382-
// We expect some data to be read (from first attempt + potentially second attempt).
383-
assert.Greater(t.T(), resp.Size, 0)
335+
assert.ErrorIs(t.T(), err, outOfRangeErr)
336+
assert.NotEqual(t.T(), "recreate failed", err.Error())
337+
// We expect 0 bytes to be read as OutOfRange returns 0 bytes.
338+
assert.Equal(t.T(), 0, resp.Size)
384339
t.bucket.AssertExpectations(t.T())
385340
}
386341

internal/gcsx/multi_range_downloader_wrapper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (t *mrdWrapperTest) Test_Read_ShortRead() {
168168

169169
bytesRead, err := t.mrdWrapper.Read(context.Background(), make([]byte, t.object.Size), 0, int64(t.object.Size), metrics.NewNoopMetrics(), false)
170170

171-
assert.ErrorIs(t.T(), err, io.EOF)
171+
assert.NoError(t.T(), err)
172172
assert.Less(t.T(), bytesRead, int(t.object.Size))
173173
}
174174

internal/storage/fake/fake_multi_range_downloader.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,6 @@ func (fmrd *fakeMultiRangeDownloader) Add(output io.Writer, offset, length int64
139139
err = fmt.Errorf("failed to write %v bytes to writer through multi-range-downloader, bytes written = %v, error = %v", length, n, err)
140140
}
141141

142-
if fmrd.shortRead && err == nil {
143-
err = io.EOF
144-
}
145-
146142
if callback != nil {
147143
callback(offset, int64(n), err)
148144
}

0 commit comments

Comments
 (0)