Skip to content

Commit 6378393

Browse files
authored
Merge pull request #1955 from ktock/carrying-1948
[Carry 1948] fs/remote: Refactor blob code to make it more modular
2 parents d5f314b + 1645a0e commit 6378393

File tree

1 file changed

+119
-95
lines changed

1 file changed

+119
-95
lines changed

fs/remote/blob.go

+119-95
Original file line numberDiff line numberDiff line change
@@ -259,28 +259,33 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) {
259259
o(&readAtOpts)
260260
}
261261

262-
// Fetcher can be suddenly updated so we take and use the snapshot of it for
263-
// consistency.
264-
b.fetcherMu.Lock()
265-
fr := b.fetcher
266-
b.fetcherMu.Unlock()
262+
fr := b.getFetcher()
263+
264+
if err := b.prepareChunksForRead(allRegion, offset, p, fr, allData, &readAtOpts); err != nil {
265+
return 0, err
266+
}
267+
268+
// Read required data
269+
if err := b.fetchRange(allData, &readAtOpts); err != nil {
270+
return 0, err
271+
}
272+
273+
return b.adjustBufferSize(p, offset), nil
274+
}
267275

268-
b.walkChunks(allRegion, func(chunk region) error {
276+
// prepareChunksForRead prepares chunks for reading by checking cache and setting up writers
277+
func (b *blob) prepareChunksForRead(allRegion region, offset int64, p []byte, fr fetcher, allData map[region]io.Writer, opts *options) error {
278+
return b.walkChunks(allRegion, func(chunk region) error {
269279
var (
270280
base = positive(chunk.b - offset)
271281
lowerUnread = positive(offset - chunk.b)
272282
upperUnread = positive(chunk.e + 1 - (offset + int64(len(p))))
273283
expectedSize = chunk.size() - upperUnread - lowerUnread
274284
)
275285

276-
// Check if the content exists in the cache
277-
r, err := b.cache.Get(fr.genID(chunk), readAtOpts.cacheOpts...)
278-
if err == nil {
279-
defer r.Close()
280-
n, err := r.ReadAt(p[base:base+expectedSize], lowerUnread)
281-
if (err == nil || err == io.EOF) && int64(n) == expectedSize {
282-
return nil
283-
}
286+
// Try to read from cache first
287+
if err := b.readFromCache(chunk, p[base:base+expectedSize], lowerUnread, fr, opts); err == nil {
288+
return nil
284289
}
285290

286291
// We missed cache. Take it from remote registry.
@@ -289,21 +294,23 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) {
289294
allData[chunk] = newBytesWriter(p[base:base+expectedSize], lowerUnread)
290295
return nil
291296
})
297+
}
292298

293-
// Read required data
294-
if err := b.fetchRange(allData, &readAtOpts); err != nil {
295-
return 0, err
299+
// readFromCache attempts to read chunk data from cache
300+
func (b *blob) readFromCache(chunk region, dest []byte, offset int64, fr fetcher, opts *options) error {
301+
r, err := b.cache.Get(fr.genID(chunk), opts.cacheOpts...)
302+
if err != nil {
303+
return err
296304
}
297-
298-
// Adjust the buffer size according to the blob size
299-
if remain := b.size - offset; int64(len(p)) >= remain {
300-
if remain < 0 {
301-
remain = 0
302-
}
303-
p = p[:remain]
305+
defer r.Close()
306+
n, err := r.ReadAt(dest, offset)
307+
if err != nil && err != io.EOF {
308+
return err
304309
}
305-
306-
return len(p), nil
310+
if n != len(dest) {
311+
return fmt.Errorf("incomplete read from cache: read %d bytes, expected %d bytes", n, len(dest))
312+
}
313+
return nil
307314
}
308315

309316
// fetchRegions fetches all specified chunks from remote blob and puts it in the local cache.
@@ -313,11 +320,7 @@ func (b *blob) fetchRegions(allData map[region]io.Writer, fetched map[region]boo
313320
return nil
314321
}
315322

316-
// Fetcher can be suddenly updated so we take and use the snapshot of it for
317-
// consistency.
318-
b.fetcherMu.Lock()
319-
fr := b.fetcher
320-
b.fetcherMu.Unlock()
323+
fr := b.getFetcher()
321324

322325
// request missed regions
323326
var req []region
@@ -332,7 +335,6 @@ func (b *blob) fetchRegions(allData map[region]io.Writer, fetched map[region]boo
332335
fetchCtx = opts.ctx
333336
}
334337
mr, err := fr.fetch(fetchCtx, req, true)
335-
336338
if err != nil {
337339
return err
338340
}
@@ -353,35 +355,9 @@ func (b *blob) fetchRegions(allData map[region]io.Writer, fetched map[region]boo
353355
return fmt.Errorf("failed to read multipart resp: %w", err)
354356
}
355357
if err := b.walkChunks(reg, func(chunk region) (retErr error) {
356-
id := fr.genID(chunk)
357-
cw, err := b.cache.Add(id, opts.cacheOpts...)
358-
if err != nil {
358+
if err := b.cacheChunkData(chunk, p, fr, allData, fetched, opts); err != nil {
359359
return err
360360
}
361-
defer cw.Close()
362-
w := io.Writer(cw)
363-
364-
// If this chunk is one of the targets, write the content to the
365-
// passed reader too.
366-
if _, ok := fetched[chunk]; ok {
367-
w = io.MultiWriter(w, allData[chunk])
368-
}
369-
370-
// Copy the target chunk
371-
if _, err := io.CopyN(w, p, chunk.size()); err != nil {
372-
cw.Abort()
373-
return err
374-
}
375-
376-
// Add the target chunk to the cache
377-
if err := cw.Commit(); err != nil {
378-
return err
379-
}
380-
381-
b.fetchedRegionSetMu.Lock()
382-
b.fetchedRegionSet.add(chunk)
383-
b.fetchedRegionSetMu.Unlock()
384-
fetched[chunk] = true
385361
return nil
386362
}); err != nil {
387363
return fmt.Errorf("failed to get chunks: %w", err)
@@ -408,9 +384,6 @@ func (b *blob) fetchRange(allData map[region]io.Writer, opts *options) error {
408384
return nil
409385
}
410386

411-
// We build a key based on regions we need to fetch and pass it to singleflightGroup.Do(...)
412-
// to block simultaneous same requests. Once the request is finished and the data is ready,
413-
// all blocked callers will be unblocked and that same data will be returned by all blocked callers.
414387
key := makeSyncKey(allData)
415388
fetched := make(map[region]bool)
416389
_, err, shared := b.fetchedRegionGroup.Do(key, func() (interface{}, error) {
@@ -420,44 +393,64 @@ func (b *blob) fetchRange(allData map[region]io.Writer, opts *options) error {
420393
// When unblocked try to read from cache in case if there were no errors
421394
// If we fail reading from cache, fetch from remote registry again
422395
if err == nil && shared {
423-
for reg := range allData {
424-
if _, ok := fetched[reg]; ok {
425-
continue
426-
}
427-
err = b.walkChunks(reg, func(chunk region) error {
428-
b.fetcherMu.Lock()
429-
fr := b.fetcher
430-
b.fetcherMu.Unlock()
431-
432-
// Check if the content exists in the cache
433-
// And if exists, read from cache
434-
r, err := b.cache.Get(fr.genID(chunk), opts.cacheOpts...)
435-
if err != nil {
436-
return err
437-
}
438-
defer r.Close()
439-
rr := io.NewSectionReader(r, 0, chunk.size())
440-
441-
// Copy the target chunk
442-
b.fetchedRegionCopyMu.Lock()
443-
defer b.fetchedRegionCopyMu.Unlock()
444-
if _, err := io.CopyN(allData[chunk], rr, chunk.size()); err != nil {
445-
return err
446-
}
447-
return nil
448-
})
449-
if err != nil {
450-
break
451-
}
396+
if err := b.handleSharedFetch(allData, fetched, opts); err != nil {
397+
return b.fetchRange(allData, opts) // retry on error
452398
}
399+
}
453400

454-
// if we cannot read the data from cache, do fetch again
455-
if err != nil {
456-
return b.fetchRange(allData, opts)
401+
return err
402+
}
403+
404+
// handleSharedFetch handles the case when multiple goroutines share the same fetch result
405+
func (b *blob) handleSharedFetch(allData map[region]io.Writer, fetched map[region]bool, opts *options) error {
406+
for reg := range allData {
407+
if _, ok := fetched[reg]; ok {
408+
continue
409+
}
410+
if err := b.copyFetchedChunks(reg, allData, opts); err != nil {
411+
return err
457412
}
458413
}
414+
return nil
415+
}
459416

460-
return err
417+
// copyFetchedChunks copies fetched chunks from cache to target writer
418+
func (b *blob) copyFetchedChunks(reg region, allData map[region]io.Writer, opts *options) error {
419+
return b.walkChunks(reg, func(chunk region) error {
420+
fr := b.getFetcher()
421+
r, err := b.cache.Get(fr.genID(chunk), opts.cacheOpts...)
422+
if err != nil {
423+
return err
424+
}
425+
defer r.Close()
426+
427+
b.fetchedRegionCopyMu.Lock()
428+
defer b.fetchedRegionCopyMu.Unlock()
429+
430+
if _, err := io.CopyN(allData[chunk], io.NewSectionReader(r, 0, chunk.size()), chunk.size()); err != nil {
431+
return err
432+
}
433+
return nil
434+
})
435+
}
436+
437+
// getFetcher safely gets the current fetcher
438+
// Fetcher can be suddenly updated so we take and use the snapshot of it for consistency.
439+
func (b *blob) getFetcher() fetcher {
440+
b.fetcherMu.Lock()
441+
defer b.fetcherMu.Unlock()
442+
return b.fetcher
443+
}
444+
445+
// adjustBufferSize adjusts buffer size according to the blob size
446+
func (b *blob) adjustBufferSize(p []byte, offset int64) int {
447+
if remain := b.size - offset; int64(len(p)) >= remain {
448+
if remain < 0 {
449+
remain = 0
450+
}
451+
p = p[:remain]
452+
}
453+
return len(p)
461454
}
462455

463456
type walkFunc func(reg region) error
@@ -533,3 +526,34 @@ func positive(n int64) int64 {
533526
}
534527
return n
535528
}
529+
530+
// cacheChunkData handles caching of chunk data
531+
func (b *blob) cacheChunkData(chunk region, r io.Reader, fr fetcher, allData map[region]io.Writer, fetched map[region]bool, opts *options) error {
532+
id := fr.genID(chunk)
533+
cw, err := b.cache.Add(id, opts.cacheOpts...)
534+
if err != nil {
535+
return fmt.Errorf("failed to create cache writer: %w", err)
536+
}
537+
defer cw.Close()
538+
539+
w := io.Writer(cw)
540+
if _, ok := fetched[chunk]; ok {
541+
w = io.MultiWriter(w, allData[chunk])
542+
}
543+
544+
if _, err := io.CopyN(w, r, chunk.size()); err != nil {
545+
cw.Abort()
546+
return fmt.Errorf("failed to write chunk data: %w", err)
547+
}
548+
549+
if err := cw.Commit(); err != nil {
550+
return fmt.Errorf("failed to commit chunk: %w", err)
551+
}
552+
553+
b.fetchedRegionSetMu.Lock()
554+
b.fetchedRegionSet.add(chunk)
555+
b.fetchedRegionSetMu.Unlock()
556+
fetched[chunk] = true
557+
558+
return nil
559+
}

0 commit comments

Comments
 (0)