Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/treeverse/lakefs/pkg/uri"
)

type objectInfo struct {
relPath string // relative path within the repository, based on the source path
objectStat *apigen.ObjectStats
}

const (
fsDownloadCmdMinArgs = 1
fsDownloadCmdMaxArgs = 2
Expand Down Expand Up @@ -67,7 +72,7 @@ var fsDownloadCmd = &cobra.Command{
// ProgressRender start render progress and return callback waiting for the progress to finish.
go pw.Render()

ch := make(chan string, filesChanSize)
ch := make(chan objectInfo, filesChanSize)
if remotePath != "" && !strings.HasSuffix(remotePath, uri.PathSeparator) {
*remote.Path += uri.PathSeparator
}
Expand All @@ -79,6 +84,7 @@ var fsDownloadCmd = &cobra.Command{
After: (*apigen.PaginationAfter)(swag.String(after)),
Prefix: (*apigen.PaginationPrefix)(remote.Path),
UserMetadata: swag.Bool(true),
Presign: swag.Bool(false), // faster listing without, we use stat object later if we need it
})
DieOnErrorOrUnexpectedStatusCode(listResp, err, http.StatusOK)
if listResp.JSON200 == nil {
Expand All @@ -88,15 +94,15 @@ var fsDownloadCmd = &cobra.Command{
DieFmt("No objects in path: %s", remote.String())
}

for _, o := range listResp.JSON200.Results {
relPath := strings.TrimPrefix(o.Path, remotePath)
for _, listItem := range listResp.JSON200.Results {
relPath := strings.TrimPrefix(listItem.Path, remotePath)
relPath = strings.TrimPrefix(relPath, uri.PathSeparator)

// skip directory markers
if relPath == "" || strings.HasSuffix(relPath, uri.PathSeparator) {
continue
}
ch <- relPath
ch <- objectInfo{relPath: relPath, objectStat: &listItem}
}
if !listResp.JSON200.Pagination.HasMore {
break
Expand All @@ -112,21 +118,21 @@ var fsDownloadCmd = &cobra.Command{
for i := 0; i < syncFlags.Parallelism; i++ {
go func() {
defer wg.Done()
for relPath := range ch {
srcPath := remote.GetPath() + relPath
for objInfo := range ch {
srcPath := remote.GetPath() + objInfo.relPath
src := uri.URI{
Repository: remote.Repository,
Ref: remote.Ref,
Path: &srcPath,
}

// progress tracker
tracker := &progress.Tracker{Message: "download " + relPath, Total: -1}
tracker := &progress.Tracker{Message: "download " + objInfo.relPath, Total: -1}
pw.AppendTracker(tracker)
tracker.Start()

dest := filepath.Join(dest, relPath)
err := downloader.Download(ctx, src, dest, tracker)
dest := filepath.Join(dest, objInfo.relPath)
err := downloader.DownloadWithObjectInfo(ctx, src, dest, tracker, objInfo.objectStat)
if err != nil {
tracker.MarkAsErrored()
DieErr(err)
Expand Down
103 changes: 69 additions & 34 deletions pkg/api/helpers/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"path/filepath"

"github.com/go-openapi/swag"
"github.com/hashicorp/go-retryablehttp"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/stats"
"github.com/treeverse/lakefs/pkg/uri"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -40,10 +43,14 @@ type downloadPart struct {
func NewDownloader(client *apigen.ClientWithResponses, preSign bool) *Downloader {
// setup http client
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = 10
httpClient := &http.Client{
Transport: transport,
}
transport.MaxIdleConnsPerHost = 50

retryClient := retryablehttp.NewClient()
retryClient.Logger = &stats.LoggerAdapter{Logger: logging.ContextUnavailable().WithField("component", "downloader")}
retryClient.RetryMax = 3
retryClient.Backoff = retryablehttp.DefaultBackoff
retryClient.HTTPClient.Transport = transport
httpClient := retryClient.StandardClient()

return &Downloader{
Client: client,
Expand All @@ -55,8 +62,13 @@ func NewDownloader(client *apigen.ClientWithResponses, preSign bool) *Downloader
}
}

// Download downloads an object from lakeFS to a local file, create the destination directory if needed.
func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
// downloadObjectCore handles the common download logic for both Download and DownloadWithObjectInfo
func (d *Downloader) downloadObjectCore(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better?

Suggested change
func (d *Downloader) downloadObjectCore(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) error {
func (d *Downloader) downloadWithObjectInfo(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) error {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't want to mix with DownloadWithObjectInfo

// Validate that object stats are provided when required
if (d.SymlinkSupport || d.PreSign) && objectStat == nil {
return fmt.Errorf("object stats are required for symlink support or presign downloads but were not provided: %w", ErrValidation)
}

// delete destination file if it exists
if err := os.Remove(dst); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove existing destination file '%s': %w", dst, err)
Expand All @@ -66,25 +78,8 @@ func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, trac
dir := filepath.Dir(dst)
_ = os.MkdirAll(dir, os.ModePerm)

// Check if we need to call StatObjectWithResponse (for symlinks or presign multipart)
var objectStat *apigen.ObjectStats
if d.SymlinkSupport || d.PreSign {
statResp, err := d.Client.StatObjectWithResponse(ctx, src.Repository, src.Ref, &apigen.StatObjectParams{
Path: apiutil.Value(src.Path),
UserMetadata: swag.Bool(d.SymlinkSupport), // Only request metadata if symlink support is enabled
Presign: swag.Bool(d.PreSign), // Only presign if needed
})
if err != nil {
return fmt.Errorf("download failed: %w", err)
}
if statResp.JSON200 == nil {
return fmt.Errorf("download failed: %w: %s", ErrRequestFailed, statResp.Status())
}
objectStat = statResp.JSON200
}

// If symlink support is enabled, check if the object is a symlink and create it if so
if d.SymlinkSupport {
if d.SymlinkSupport && objectStat != nil {
symlinkTarget, found := objectStat.Metadata.Get(apiutil.SymlinkMetadataKey)
if found && symlinkTarget != "" {
if tracker != nil {
Expand All @@ -103,7 +98,7 @@ func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, trac

// download object
var err error
if d.PreSign && objectStat.SizeBytes != nil {
if d.PreSign && objectStat != nil && swag.Int64Value(objectStat.SizeBytes) >= d.PartSize {
// download using presigned multipart download, it will fall back to presign single object download if needed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will it fallback to presign single object download?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud return an error in case the object size < d.PartSize, but instead it will just call downloadObject.
I've added a check here, before the call just not to go through the fallback.
Both are internal methods - I cloud just return an error in downloadPresignMultipart, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If downloadPresignMultipart is only called from here, you can remove the internal check in downloadPresignMultipart. If it's not, then leave it as is

err = d.downloadPresignMultipart(ctx, src, dst, tracker, objectStat)
} else {
Expand All @@ -115,17 +110,46 @@ func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, trac
return nil
}

func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) (err error) {
// Use provided stat response object metadata for size and physical address (presigned)
// DownloadWithObjectInfo downloads an object from lakeFS using pre-fetched object information,
// avoiding the need for a separate stat call.
func (d *Downloader) DownloadWithObjectInfo(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) error {
return d.downloadObjectCore(ctx, src, dst, tracker, objectStat)
}

// Download downloads an object from lakeFS to a local file, create the destination directory if needed.
func (d *Downloader) Download(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker) error {
// Check if we need to call StatObjectWithResponse (for symlinks or presign multipart)
var objectStat *apigen.ObjectStats
if d.SymlinkSupport || d.PreSign {
statResp, err := d.Client.StatObjectWithResponse(ctx, src.Repository, src.Ref, &apigen.StatObjectParams{
Path: apiutil.Value(src.Path),
UserMetadata: swag.Bool(d.SymlinkSupport), // Only request metadata if symlink support is enabled
Presign: swag.Bool(d.PreSign), // Only presign if needed
})
if err != nil {
return fmt.Errorf("download failed: %w", err)
}
if statResp.JSON200 == nil {
return fmt.Errorf("download failed: %w: %s", ErrRequestFailed, statResp.Status())
}
objectStat = statResp.JSON200
}

return d.downloadObjectCore(ctx, src, dst, tracker, objectStat)
}

// downloadPresignMultipart downloads a large object, must be larger or equal to PartSize using a presigned URL.
// It uses multiple concurrent range requests to download the object in parts.
// If the object is smaller than PartSize, it falls back to a single `downloadObject` call.
func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI, dst string, tracker *progress.Tracker, objectStat *apigen.ObjectStats) (err error) {
// check if the object is small enough to download in one request
size := swag.Int64Value(objectStat.SizeBytes)
if size < d.PartSize {
return fmt.Errorf("object is smaller than PartSize (%d): %w", d.PartSize, ErrValidation)
}
if tracker != nil {
tracker.UpdateTotal(size)
}
if size < d.PartSize {
return d.downloadObject(ctx, src, dst, tracker)
}

f, err := os.Create(dst)
if err != nil {
Expand All @@ -140,13 +164,24 @@ func (d *Downloader) downloadPresignMultipart(ctx context.Context, src uri.URI,
return fmt.Errorf("failed to truncate '%s' to size %d: %w", f.Name(), size, err)
}

// download the file using ranges and concurrency
physicalAddress := objectStat.PhysicalAddress
// download the file using ranges and concurrency with a fresh presigned URL
statResp, err := d.Client.StatObjectWithResponse(ctx, src.Repository, src.Ref, &apigen.StatObjectParams{
Path: apiutil.Value(src.Path),
UserMetadata: swag.Bool(d.SymlinkSupport), // Only request metadata if symlink support is enabled
Presign: swag.Bool(d.PreSign), // Only presign if needed
})
if err != nil {
return fmt.Errorf("download failed: %w", err)
}
if statResp.JSON200 == nil {
return fmt.Errorf("download failed: %w: %s", ErrRequestFailed, statResp.Status())
}
physicalAddress := statResp.JSON200.PhysicalAddress

ch := make(chan downloadPart, DefaultDownloadConcurrency)
// start download workers
ch := make(chan downloadPart, DefaultDownloadConcurrency)
g, grpCtx := errgroup.WithContext(context.Background())
for i := 0; i < DefaultDownloadConcurrency; i++ {
for range DefaultDownloadConcurrency {
g.Go(func() error {
buf := make([]byte, d.PartSize)
for part := range ch {
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/helpers/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
// ErrNotFound is returned when something is not found remotely.
ErrNotFound = errors.New("not found")
ErrConflict = errors.New("conflict")
// ErrValidation is returned when input validation fails.
ErrValidation = errors.New("validation error")
)

const minHTTPErrorStatusCode = 400
Expand Down
Loading