Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/treeverse/lakefs/pkg/uri"
)

type objectInfo struct {
relPath string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relative to what? document

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment.

relative to the source path in the repo - used the same terminology used on lakectl fs download code.

object *apigen.ObjectStats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
object *apigen.ObjectStats
stats *apigen.ObjectStats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to objectStat

}

const (
fsDownloadCmdMinArgs = 1
fsDownloadCmdMaxArgs = 2
Expand Down Expand Up @@ -67,7 +72,7 @@ var fsDownloadCmd = &cobra.Command{
// ProgressRender start render progress and return callback waiting for the progress to finish.
go pw.Render()

ch := make(chan string, filesChanSize)
ch := make(chan objectInfo, filesChanSize)
if remotePath != "" && !strings.HasSuffix(remotePath, uri.PathSeparator) {
*remote.Path += uri.PathSeparator
}
Expand All @@ -79,6 +84,7 @@ var fsDownloadCmd = &cobra.Command{
After: (*apigen.PaginationAfter)(swag.String(after)),
Prefix: (*apigen.PaginationPrefix)(remote.Path),
UserMetadata: swag.Bool(true),
Presign: swag.Bool(false), // faster listing without, we use stat object later if we need it
})
DieOnErrorOrUnexpectedStatusCode(listResp, err, http.StatusOK)
if listResp.JSON200 == nil {
Expand All @@ -96,7 +102,7 @@ var fsDownloadCmd = &cobra.Command{
if relPath == "" || strings.HasSuffix(relPath, uri.PathSeparator) {
continue
}
ch <- relPath
ch <- objectInfo{relPath: relPath, object: &o}
}
if !listResp.JSON200.Pagination.HasMore {
break
Expand All @@ -112,21 +118,21 @@ var fsDownloadCmd = &cobra.Command{
for i := 0; i < syncFlags.Parallelism; i++ {
go func() {
defer wg.Done()
for relPath := range ch {
srcPath := remote.GetPath() + relPath
for objInfo := range ch {
srcPath := remote.GetPath() + objInfo.relPath
src := uri.URI{
Repository: remote.Repository,
Ref: remote.Ref,
Path: &srcPath,
}

// progress tracker
tracker := &progress.Tracker{Message: "download " + relPath, Total: -1}
tracker := &progress.Tracker{Message: "download " + objInfo.relPath, Total: -1}
pw.AppendTracker(tracker)
tracker.Start()

dest := filepath.Join(dest, relPath)
err := downloader.Download(ctx, src, dest, tracker)
dest := filepath.Join(dest, objInfo.relPath)
err := downloader.DownloadWithObjectInfo(ctx, src, dest, tracker, objInfo.object)
if err != nil {
tracker.MarkAsErrored()
DieErr(err)
Expand Down
93 changes: 78 additions & 15 deletions pkg/api/helpers/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"path/filepath"

"github.com/go-openapi/swag"
"github.com/hashicorp/go-retryablehttp"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/stats"
"github.com/treeverse/lakefs/pkg/uri"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -40,10 +43,14 @@ type downloadPart struct {
func NewDownloader(client *apigen.ClientWithResponses, preSign bool) *Downloader {
// setup http client
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = 10
httpClient := &http.Client{
Transport: transport,
}
transport.MaxIdleConnsPerHost = 50

retryClient := retryablehttp.NewClient()
retryClient.Logger = &stats.LoggerAdapter{Logger: logging.ContextUnavailable().WithField("component", "downloader")}
retryClient.RetryMax = 3
retryClient.Backoff = retryablehttp.DefaultBackoff
retryClient.HTTPClient.Transport = transport
httpClient := retryClient.StandardClient()

return &Downloader{
Client: client,
Expand All @@ -55,6 +62,50 @@ func NewDownloader(client *apigen.ClientWithResponses, preSign bool) *Downloader
}
}

// DownloadWithObjectInfo downloads an object from lakeFS using pre-fetched object information,
// avoiding the need for a separate stat call.
func (d *Downloader) DownloadWithObjectInfo(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) error {
// delete destination file if it exists
if err := os.Remove(dst); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove existing destination file '%s': %w", dst, err)
}

// create destination dir if needed
dir := filepath.Dir(dst)
_ = os.MkdirAll(dir, os.ModePerm)

// If symlink support is enabled, check if the object is a symlink and create it if so
if d.SymlinkSupport {
symlinkTarget, found := objectStat.Metadata.Get(apiutil.SymlinkMetadataKey)
if found && symlinkTarget != "" {
if tracker != nil {
tracker.UpdateTotal(0)
tracker.MarkAsDone()
}
// Skip non-regular files
if d.SkipNonRegularFiles {
return nil
}
// Create symlink instead of downloading file content
return os.Symlink(symlinkTarget, dst)
}
// fallthrough to download the object
}

// download object
var err error
if d.PreSign && swag.Int64Value(objectStat.SizeBytes) >= d.PartSize {
// download using presigned multipart download, it will fall back to presign single object download if needed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will it fallback to presign single object download?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud return an error in case the object size < d.PartSize, but instead it will just call downloadObject.
I've added a check here, before the call just not to go through the fallback.
Both are internal methods - I cloud just return an error in downloadPresignMultipart, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If downloadPresignMultipart is only called from here, you can remove the internal check in downloadPresignMultipart. If it's not, then leave it as is

err = d.downloadPresignMultipart(ctx, src, dst, tracker, objectStat)
} else {
err = d.downloadObject(ctx, src, dst, tracker)
}
if err != nil {
return fmt.Errorf("download failed: %w", err)
}
return nil
}

// Download downloads an object from lakeFS to a local file, create the destination directory if needed.
func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
// delete destination file if it exists
Expand Down Expand Up @@ -84,7 +135,7 @@ func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, trac
}

// If symlink support is enabled, check if the object is a symlink and create it if so
if d.SymlinkSupport {
if d.SymlinkSupport && objectStat != nil {
symlinkTarget, found := objectStat.Metadata.Get(apiutil.SymlinkMetadataKey)
if found && symlinkTarget != "" {
if tracker != nil {
Expand All @@ -103,7 +154,7 @@ func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, trac

// download object
var err error
if d.PreSign && objectStat.SizeBytes != nil {
if d.PreSign && objectStat != nil && objectStat.SizeBytes != nil {
// download using presigned multipart download, it will fall back to presign single object download if needed
err = d.downloadPresignMultipart(ctx, src, dst, tracker, objectStat)
} else {
Expand All @@ -115,17 +166,18 @@ func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, trac
return nil
}

// downloadPresignMultipart downloads a large object, must be larger or equal to PartSize using a presigned URL.
// It uses multiple concurrent range requests to download the object in parts.
// If the object is smaller than PartSize, it falls back to a single `downloadObject` call.
func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) (err error) {
// Use provided stat response object metadata for size and physical address (presigned)

// check if the object is small enough to download in one request
size := swag.Int64Value(objectStat.SizeBytes)
if tracker != nil {
tracker.UpdateTotal(size)
}
if size < d.PartSize {
return d.downloadObject(ctx, src, dst, tracker)
}
if tracker != nil {
tracker.UpdateTotal(size)
}

f, err := os.Create(dst)
if err != nil {
Expand All @@ -140,13 +192,24 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
return fmt.Errorf("failed to truncate '%s' to size %d: %w", f.Name(), size, err)
}

// download the file using ranges and concurrency
physicalAddress := objectStat.PhysicalAddress
// download the file using ranges and concurrency with a fresh presigned URL
statResp, err := d.Client.StatObjectWithResponse(ctx, src.Repository, src.Ref, &apigen.StatObjectParams{
Path: apiutil.Value(src.Path),
UserMetadata: swag.Bool(d.SymlinkSupport), // Only request metadata if symlink support is enabled
Presign: swag.Bool(d.PreSign), // Only presign if needed
})
if err != nil {
return fmt.Errorf("download failed: %w", err)
}
if statResp.JSON200 == nil {
return fmt.Errorf("download failed: %w: %s", ErrRequestFailed, statResp.Status())
}
physicalAddress := statResp.JSON200.PhysicalAddress

ch := make(chan downloadPart, DefaultDownloadConcurrency)
// start download workers
ch := make(chan downloadPart, DefaultDownloadConcurrency)
g, grpCtx := errgroup.WithContext(context.Background())
for i := 0; i < DefaultDownloadConcurrency; i++ {
for range DefaultDownloadConcurrency {
g.Go(func() error {
buf := make([]byte, d.PartSize)
for part := range ch {
Expand Down
Loading