-
Notifications
You must be signed in to change notification settings - Fork 178
feat(sync): image blob streaming #3778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
regclient PR - regclient/regclient#1046 |
|
This is currently in very early stages of development. It does currently work though - image blobs get streamed while zot is still downloading things. There is also minimal modification to existing sync code at the moment. It fits in quite well. There are still a bunch of cases to be handled that may change this. |
|
Some early experiments: |
Signed-off-by: Vishwas Rajashekar <[email protected]>
| sm.streamLock.Lock() | ||
| defer sm.streamLock.Unlock() | ||
|
|
||
| // TODO: this can result in a race condition if the ImageCopy with Options hasn't triggered the hook yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: this isn't a problem anymore - the manifest fetch adds blob readers to the map before returning so that the readers are all present before a client could ever request the corresponding blobs.
| godigest "github.com/opencontainers/go-digest" | ||
| ) | ||
|
|
||
| type StreamTempStore interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created this just for test purposes. We could find a smart way to do away with it.
| var internalBuffBytes []byte = make([]byte, 0, cbr.chunkSizeBytes) | ||
| internalBuff := bytes.NewBuffer(internalBuffBytes) | ||
|
|
||
| multiWriter := io.MultiWriter(cbr.onDiskFile, internalBuff) | ||
|
|
||
| numBytesRead, err := io.CopyN(multiWriter, cbr.InFlightReader, cbr.chunkSizeBytes) | ||
| if err != nil { | ||
| if !errors.Is(err, io.EOF) { | ||
| cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader") | ||
| // TODO: This means there was an upstream read error. Should the in-progress streams be terminated? | ||
| copy(buff, internalBuff.Bytes()) | ||
| cbr.chunksMu.Unlock() | ||
|
|
||
| return int(numBytesRead), err | ||
| } | ||
| } | ||
|
|
||
| copy(buff, internalBuff.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is not very efficient. Will work on improving this. It is currently only marginally faster in some cases due to the overheads brought in by the chunk Size.
The speed up gained in streaming is lost again due to low download speed. Need to find a way to get to the same raw download speeds as if the image were already on disk.
| // imager, ok := orig.(manifest.Imager) | ||
| // if !ok { | ||
| // return nil, errors.New("failed to convert to imager") | ||
| // } | ||
|
|
||
| // next, for config | ||
| // cfg, err := imager.GetConfig() | ||
| // if err != nil { | ||
| // return nil, err | ||
| // } | ||
|
|
||
| err = service.streamManager.PrepareActiveStreamForBlob(contents.Config.Digest) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // finally, for all layers | ||
| // layers, err := imager.GetLayers() | ||
| // if err != nil { | ||
| // return nil, err | ||
| // } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to read things from the manifest directly, but I think I'm using the Imager interface wrong. I Unmarshalled the manifest body instead. I'll take a look at this again later. For now, it works just fine.
| routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). | ||
| Msg("streaming is enabled. Direct fetching manifest.") | ||
|
|
||
| fetchedManifest, err := routeHandler.c.SyncOnDemand.FetchManifest(ctx, name, reference) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, every client request will cause a fresh manifest fetch to upstream which is not good. Need to implement something to cache the manifest as well while blobs are syncing.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3778 +/- ##
==========================================
- Coverage 91.63% 90.74% -0.89%
==========================================
Files 190 194 +4
Lines 27059 27333 +274
==========================================
+ Hits 24795 24803 +8
- Misses 1463 1725 +262
- Partials 801 805 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| "extensions": { | ||
| "sync": { | ||
| "enable": true, | ||
| "enableStreaming": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream: true
| "sync": { | ||
| "enable": true, | ||
| "enableStreaming": true, | ||
| "streamChunkSizeBytes": 32768, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed? how does it affect client-side request handling?
What type of PR is this?
feature
Which issue does this PR fix:
New Feature - Streaming Sync
Base design in #3733
What does this PR do / Why do we need it:
Testing done on this change:
WIP
Automation added to e2e:
WIP
Will this break upgrades or downgrades?
No - feature usage is optional and only takes effect when configured.
Does this PR introduce any user-facing change?:
WIP
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.