From 1924a0b627d40dab5cb9a365d1d9676ccf3b8c38 Mon Sep 17 00:00:00 2001 From: Jesse Brown Date: Fri, 8 Aug 2025 09:13:36 -0500 Subject: [PATCH 1/3] containerd: optimizations - Use compressed layer format for containerd storage - Added parallelization in layer processing - Optimized size calculation during tar processing --- local/store.go | 379 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 316 insertions(+), 63 deletions(-) diff --git a/local/store.go b/local/store.go index 87e1d10a..556cae31 100644 --- a/local/store.go +++ b/local/store.go @@ -35,6 +35,10 @@ type Store struct { // optional downloadOnce *sync.Once onDiskLayersByDiffID map[v1.Hash]annotatedLayer + + // containerd storage detection cache + containerdStorageCache *bool + containerdStorageCacheLock sync.RWMutex } // DockerClient is subset of client.APIClient required by this package. @@ -89,17 +93,20 @@ func (s *Store) Save(img *Image, withName string, withAdditionalNames ...string) ) // save - canOmitBaseLayers := !usesContainerdStorage(s.dockerClient) + isContainerdStorage := s.usesContainerdStorageCached() + canOmitBaseLayers := !isContainerdStorage + if canOmitBaseLayers { // During the first save attempt some layers may be excluded. // The docker daemon allows this if the given set of layers already exists in the daemon in the given order. - inspect, err = s.doSave(img, withName) + inspect, err = s.doSave(img, withName, isContainerdStorage) } if !canOmitBaseLayers || err != nil { if err = img.ensureLayers(); err != nil { return "", err } - inspect, err = s.doSave(img, withName) + + inspect, err = s.doSave(img, withName, isContainerdStorage) if err != nil { saveErr := imgutil.SaveError{} for _, n := range append([]string{withName}, withAdditionalNames...) { @@ -116,6 +123,7 @@ func (s *Store) Save(img *Image, withName string, withAdditionalNames ...string) errs = append(errs, imgutil.SaveDiagnostic{ImageName: n, Cause: err}) } } + if len(errs) > 0 { return "", imgutil.SaveError{Errors: errs} } @@ -132,6 +140,30 @@ func tryNormalizing(name string) string { return t.Name() // returns valid 'name:tag' appending 'latest', if missing tag } +// usesContainerdStorageCached provides cached containerd storage detection +func (s *Store) usesContainerdStorageCached() bool { + s.containerdStorageCacheLock.RLock() + if s.containerdStorageCache != nil { + result := *s.containerdStorageCache + s.containerdStorageCacheLock.RUnlock() + return result + } + s.containerdStorageCacheLock.RUnlock() + + // Need to compute and cache + s.containerdStorageCacheLock.Lock() + defer s.containerdStorageCacheLock.Unlock() + + // Double-check after acquiring write lock + if s.containerdStorageCache != nil { + return *s.containerdStorageCache + } + + result := usesContainerdStorage(s.dockerClient) + s.containerdStorageCache = &result + return result +} + func usesContainerdStorage(docker DockerClient) bool { info, err := docker.Info(context.Background()) if err != nil { @@ -147,7 +179,7 @@ func usesContainerdStorage(docker DockerClient) bool { return false } -func (s *Store) doSave(img v1.Image, withName string) (image.InspectResponse, error) { +func (s *Store) doSave(img v1.Image, withName string, isContainerdStorage bool) (image.InspectResponse, error) { ctx := context.Background() done := make(chan error) @@ -155,7 +187,9 @@ func (s *Store) doSave(img v1.Image, withName string) (image.InspectResponse, er pr, pw := io.Pipe() defer pw.Close() + // Start the ImageLoad goroutine go func() { + res, err := s.dockerClient.ImageLoad(ctx, pr, client.ImageLoadWithQuiet(true)) if err != nil { done <- err @@ -165,6 +199,7 @@ func (s *Store) doSave(img v1.Image, withName string) (image.InspectResponse, er // only return the response error after the response is drained and closed responseErr := checkResponseError(res.Body) drainCloseErr := ensureReaderClosed(res.Body) + if responseErr != nil { done <- responseErr return @@ -176,19 +211,25 @@ func (s *Store) doSave(img v1.Image, withName string) (image.InspectResponse, er done <- nil }() + // Create tar content tw := tar.NewWriter(pw) defer tw.Close() - if err = s.addImageToTar(tw, img, withName); err != nil { + if err = s.addImageToTar(tw, img, withName, isContainerdStorage); err != nil { return image.InspectResponse{}, err } + tw.Close() pw.Close() + + // Wait for ImageLoad to complete err = <-done + if err != nil { return image.InspectResponse{}, fmt.Errorf("loading image %q. first error: %w", withName, err) } + // Inspect the saved image inspect, err := s.dockerClient.ImageInspect(context.Background(), withName) if err != nil { if cerrdefs.IsNotFound(err) { @@ -196,10 +237,12 @@ func (s *Store) doSave(img v1.Image, withName string) (image.InspectResponse, er } return image.InspectResponse{}, err } + return inspect, nil } -func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string) error { +func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string, isContainerdStorage bool) error { + // Add config file rawConfigFile, err := image.RawConfigFile() if err != nil { return err @@ -208,23 +251,197 @@ func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string) e if err = addTextToTar(tw, rawConfigFile, configHash+".json"); err != nil { return err } + + // Get layers layers, err := image.Layers() if err != nil { return err } - var ( - layerPaths []string - blankIdx int - ) - for _, layer := range layers { - layerName, err := s.addLayerToTar(tw, layer, blankIdx) - if err != nil { - return err + + // Pre-process layers in parallel, then write to tar sequentially + + // Parallel Processing Optimization: Prepare layer metadata concurrently, then write sequentially + // This overlaps the expensive layer reader preparation while maintaining tar format integrity. + // Prepare layer metadata in parallel + type layerInfo struct { + index int + name string + size int64 + diffID v1.Hash + isBlank bool + reader io.ReadCloser + err error + } + + layerInfos := make([]*layerInfo, len(layers)) + + // Parallel preparation phase + g, ctx := errgroup.WithContext(context.Background()) + + // Limit concurrent layer preparation to avoid too many open file handles + maxConcurrency := 3 + if len(layers) < maxConcurrency { + maxConcurrency = len(layers) + } + semaphore := make(chan struct{}, maxConcurrency) + + for i, layer := range layers { + i, layer := i, layer // capture loop variables + layerInfos[i] = &layerInfo{index: i} + + g.Go(func() error { + // Acquire semaphore + select { + case semaphore <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + defer func() { <-semaphore }() + + info := layerInfos[i] + + // Get layer size to determine if it's blank + size, err := layer.Size() + if err != nil { + info.err = err + return err + } + info.size = size + + if size == -1 { + // Blank layer + info.isBlank = true + info.name = fmt.Sprintf("blank_%d", i) + return nil + } + + // Get diff ID for non-blank layers + diffID, err := layer.DiffID() + if err != nil { + info.err = err + return err + } + info.diffID = diffID + info.name = fmt.Sprintf("/%s.tar", diffID.String()) + + // Pre-open the reader to check for errors early + reader, err := layer.Uncompressed() + if err != nil { + info.err = err + return err + } + info.reader = reader + + return nil + }) + } + + // Wait for all preparation to complete + if err := g.Wait(); err != nil { + // Close any opened readers on error + for _, info := range layerInfos { + if info.reader != nil { + info.reader.Close() + } + } + return err + } + + // Sequential tar writing phase + var layerPaths []string + + for i, info := range layerInfos { + if info.err != nil { + return info.err + } + + if info.isBlank { + // Write blank layer + hdr := &tar.Header{Name: info.name, Mode: 0644, Size: 0} + if err := tw.WriteHeader(hdr); err != nil { + return err + } + } else { + // Write populated layer + defer info.reader.Close() + + // CONTAINERD OPTIMIZATION: For containerd, calculate size efficiently during tar writing + var uncompressedSize int64 + var err error + + if isContainerdStorage { + // For containerd, use Docker-native format bypass optimization + + // Docker-Native Bypass: Skip custom tar creation, use existing layer files directly + + // Use the layer's existing compressed format if available + // This mimics what docker save/load does natively + compressedReader, err := layers[i].Compressed() + if err != nil { + // Fallback to uncompressed if compressed not available + compressedReader = info.reader + } else { + // Close the uncompressed reader since we're using compressed + info.reader.Close() + } + defer compressedReader.Close() + + // Stream compressed data directly to tar (Docker-native format) + tempFile, err := os.CreateTemp("", "compressed-layer-*.tar") + if err != nil { + return err + } + defer os.Remove(tempFile.Name()) + defer tempFile.Close() + + // Calculate size while streaming to temp file + bytesRead, err := io.Copy(tempFile, compressedReader) + if err != nil { + return err + } + uncompressedSize = bytesRead + + // Rewind temp file for reading + if _, err := tempFile.Seek(0, 0); err != nil { + return err + } + + // Write tar header with calculated size (Docker-native format) + hdr := &tar.Header{Name: info.name, Mode: 0644, Size: uncompressedSize} + if err := tw.WriteHeader(hdr); err != nil { + return err + } + + // Stream from temp file to tar + _, err = io.Copy(tw, tempFile) + if err != nil { + return err + } + } else { + // For standard Docker storage, use cached size calculation (existing behavior) + uncompressedSize, err = s.getLayerSize(layers[i]) + if err != nil { + return err + } + + // Write tar header with cached size + hdr := &tar.Header{Name: info.name, Mode: 0644, Size: uncompressedSize} + if err := tw.WriteHeader(hdr); err != nil { + return err + } + + // Copy layer data + _, err = io.Copy(tw, info.reader) + if err != nil { + return err + } + } } - blankIdx++ - layerPaths = append(layerPaths, layerName) + + layerPaths = append(layerPaths, info.name) } + // Add manifest manifestJSON, err := json.Marshal([]map[string]interface{}{ { "Config": configHash + ".json", @@ -235,48 +452,11 @@ func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string) e if err != nil { return err } - return addTextToTar(tw, manifestJSON, "manifest.json") -} - -func (s *Store) addLayerToTar(tw *tar.Writer, layer v1.Layer, blankIdx int) (string, error) { - // If the layer is a previous image layer that hasn't been downloaded yet, - // cause ALL the previous image layers to be downloaded by grabbing the ReadCloser. - layerReader, err := layer.Uncompressed() - if err != nil { - return "", err - } - defer layerReader.Close() - - var layerName string - size, err := layer.Size() - if err != nil { - return "", err - } - if size == -1 { // it's a base (always empty) layer - layerName = fmt.Sprintf("blank_%d", blankIdx) - hdr := &tar.Header{Name: layerName, Mode: 0644, Size: 0} - return layerName, tw.WriteHeader(hdr) - } - // it's a populated layer - layerDiffID, err := layer.DiffID() - if err != nil { - return "", err - } - layerName = fmt.Sprintf("/%s.tar", layerDiffID.String()) - - uncompressedSize, err := s.getLayerSize(layer) - if err != nil { - return "", err - } - hdr := &tar.Header{Name: layerName, Mode: 0644, Size: uncompressedSize} - if err = tw.WriteHeader(hdr); err != nil { - return "", err - } - if _, err = io.Copy(tw, layerReader); err != nil { - return "", err + if err = addTextToTar(tw, manifestJSON, "manifest.json"); err != nil { + return err } - return layerName, nil + return nil } // getLayerSize returns the uncompressed layer size. @@ -288,14 +468,17 @@ func (s *Store) getLayerSize(layer v1.Layer) (int64, error) { if err != nil { return 0, err } + knownLayer, layerFound := s.onDiskLayersByDiffID[diffID] if layerFound && knownLayer.uncompressedSize != -1 { return knownLayer.uncompressedSize, nil } + // FIXME: this is a time sink and should be avoided if the daemon accepts OCI layout-formatted tars // If layer was not seen previously, we need to read it to get the uncompressed size // In practice, we should only get here if layers saved from the daemon via `docker save` // are output compressed. + layerReader, err := layer.Uncompressed() if err != nil { return 0, err @@ -307,6 +490,7 @@ func (s *Store) getLayerSize(layer v1.Layer) (int64, error) { if err != nil { return 0, err } + return size, nil } @@ -380,7 +564,7 @@ func (s *Store) SaveFile(image *Image, withName string) (string, error) { tw := tar.NewWriter(pw) defer tw.Close() - return s.addImageToTar(tw, image, withName) + return s.addImageToTar(tw, image, withName, s.usesContainerdStorageCached()) }) err = errs.Wait() @@ -453,12 +637,47 @@ func (s *Store) doDownloadLayersFor(identifier string) error { return err } + // Parallel Processing Optimization: Process multiple layers concurrently during download + // This can significantly reduce wall-clock time, especially for containerd storage where + // layers are downloaded fresh and need expensive uncompressed size calculations. + // Use errgroup for parallel layer processing with bounded concurrency + g, ctx := errgroup.WithContext(context.Background()) + + // Limit concurrent layer processing to avoid overwhelming the system + // Use a reasonable number based on typical layer counts and system resources + maxConcurrency := 4 + if len(configFile.RootFS.DiffIDs) < maxConcurrency { + maxConcurrency = len(configFile.RootFS.DiffIDs) + } + + // Create a channel to limit concurrency + semaphore := make(chan struct{}, maxConcurrency) + for idx := range configFile.RootFS.DiffIDs { - layerPath := filepath.Join(tmpDir, manifest[0].Layers[idx]) - if _, err := s.AddLayer(layerPath); err != nil { - return err - } + idx := idx // capture loop variable + g.Go(func() error { + // Acquire semaphore + select { + case semaphore <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + defer func() { <-semaphore }() // Release semaphore + + layerPath := filepath.Join(tmpDir, manifest[0].Layers[idx]) + + if _, err := s.AddLayer(layerPath); err != nil { + return err + } + return nil + }) + } + + // Wait for all layers to complete + if err := g.Wait(); err != nil { + return err } + return nil } @@ -542,15 +761,22 @@ func (s *Store) findLayer(withHash v1.Hash) v1.Layer { return aLayer.layer } +// AddLayer adds a layer from a file path to the store's cache. +// This method includes a performance optimization: for compressed layers, it proactively +// calculates the uncompressed size during the add operation to avoid expensive cache misses +// later during tar creation. This optimization is particularly important for containerd storage +// scenarios where layers are downloaded fresh and don't have cached uncompressed sizes. func (s *Store) AddLayer(fromPath string) (v1.Layer, error) { layer, err := tarball.LayerFromFile(fromPath) if err != nil { return nil, err } + diffID, err := layer.DiffID() if err != nil { return nil, err } + var uncompressedSize int64 fileSize, err := func() (int64, error) { fi, err := os.Stat(fromPath) @@ -562,19 +788,46 @@ func (s *Store) AddLayer(fromPath string) (v1.Layer, error) { if err != nil { return nil, err } + compressedSize, err := layer.Size() if err != nil { return nil, err } + if fileSize == compressedSize { - // the layer is compressed, we don't know the uncompressed size - uncompressedSize = -1 + // the layer is compressed + + // CONTAINERD OPTIMIZATION: Skip expensive size calculation during download for containerd + // We'll calculate it more efficiently during tar creation + if s.usesContainerdStorageCached() { + uncompressedSize = -1 // Will be calculated efficiently during tar creation + } else { + // For standard Docker storage, calculate size now to avoid cache misses later + + layerReader, err := layer.Uncompressed() + if err != nil { + // Fall back to unknown size to maintain backward compatibility + uncompressedSize = -1 + } else { + defer layerReader.Close() + var calculatedSize int64 + calculatedSize, err = io.Copy(io.Discard, layerReader) + if err != nil { + // Fall back to unknown size to maintain backward compatibility + uncompressedSize = -1 + } else { + uncompressedSize = calculatedSize + } + } + } } else { uncompressedSize = fileSize } + s.onDiskLayersByDiffID[diffID] = annotatedLayer{ layer: layer, uncompressedSize: uncompressedSize, } + return layer, nil } From a96aeecc1fd427d34bf4db61f2c6e8b3bcd86dc0 Mon Sep 17 00:00:00 2001 From: Jesse Brown Date: Fri, 8 Aug 2025 10:53:53 -0500 Subject: [PATCH 2/3] fixup! containerd: optimizations --- local/store.go | 1 - 1 file changed, 1 deletion(-) diff --git a/local/store.go b/local/store.go index 556cae31..827dfe4e 100644 --- a/local/store.go +++ b/local/store.go @@ -189,7 +189,6 @@ func (s *Store) doSave(img v1.Image, withName string, isContainerdStorage bool) // Start the ImageLoad goroutine go func() { - res, err := s.dockerClient.ImageLoad(ctx, pr, client.ImageLoadWithQuiet(true)) if err != nil { done <- err From c01fd0f6f9e42385a722bdf9ee59ac928d9d2eda Mon Sep 17 00:00:00 2001 From: Jesse Brown Date: Fri, 8 Aug 2025 11:45:14 -0500 Subject: [PATCH 3/3] fixup! containerd: optimizations Signed-off-by: Jesse Brown --- local/store.go | 151 ++++++++++++------------------------------------- 1 file changed, 35 insertions(+), 116 deletions(-) diff --git a/local/store.go b/local/store.go index 827dfe4e..30e89994 100644 --- a/local/store.go +++ b/local/store.go @@ -257,116 +257,40 @@ func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string, i return err } - // Pre-process layers in parallel, then write to tar sequentially - - // Parallel Processing Optimization: Prepare layer metadata concurrently, then write sequentially - // This overlaps the expensive layer reader preparation while maintaining tar format integrity. - // Prepare layer metadata in parallel - type layerInfo struct { - index int - name string - size int64 - diffID v1.Hash - isBlank bool - reader io.ReadCloser - err error - } - - layerInfos := make([]*layerInfo, len(layers)) - - // Parallel preparation phase - g, ctx := errgroup.WithContext(context.Background()) - - // Limit concurrent layer preparation to avoid too many open file handles - maxConcurrency := 3 - if len(layers) < maxConcurrency { - maxConcurrency = len(layers) - } - semaphore := make(chan struct{}, maxConcurrency) - - for i, layer := range layers { - i, layer := i, layer // capture loop variables - layerInfos[i] = &layerInfo{index: i} - - g.Go(func() error { - // Acquire semaphore - select { - case semaphore <- struct{}{}: - case <-ctx.Done(): - return ctx.Err() - } - defer func() { <-semaphore }() - - info := layerInfos[i] - - // Get layer size to determine if it's blank - size, err := layer.Size() - if err != nil { - info.err = err - return err - } - info.size = size - - if size == -1 { - // Blank layer - info.isBlank = true - info.name = fmt.Sprintf("blank_%d", i) - return nil - } - - // Get diff ID for non-blank layers - diffID, err := layer.DiffID() - if err != nil { - info.err = err - return err - } - info.diffID = diffID - info.name = fmt.Sprintf("/%s.tar", diffID.String()) - - // Pre-open the reader to check for errors early - reader, err := layer.Uncompressed() - if err != nil { - info.err = err - return err - } - info.reader = reader - - return nil - }) - } - - // Wait for all preparation to complete - if err := g.Wait(); err != nil { - // Close any opened readers on error - for _, info := range layerInfos { - if info.reader != nil { - info.reader.Close() - } + var ( + layerPaths []string + blankIdx int + ) + for _, layer := range layers { + // If the layer is a previous image layer that hasn't been downloaded yet, + // cause ALL the previous image layers to be downloaded by grabbing the ReadCloser. + layerReader, err := layer.Uncompressed() + if err != nil { + return err } - return err - } + defer layerReader.Close() - // Sequential tar writing phase - var layerPaths []string - - for i, info := range layerInfos { - if info.err != nil { - return info.err + var layerName string + size, err := layer.Size() + if err != nil { + return err } - - if info.isBlank { - // Write blank layer - hdr := &tar.Header{Name: info.name, Mode: 0644, Size: 0} + if size == -1 { // it's a base (always empty) layer + layerName = fmt.Sprintf("blank_%d", blankIdx) + hdr := &tar.Header{Name: layerName, Mode: 0644, Size: 0} if err := tw.WriteHeader(hdr); err != nil { return err } } else { - // Write populated layer - defer info.reader.Close() + // it's a populated layer + layerDiffID, err := layer.DiffID() + if err != nil { + return err + } + layerName = fmt.Sprintf("/%s.tar", layerDiffID.String()) // CONTAINERD OPTIMIZATION: For containerd, calculate size efficiently during tar writing var uncompressedSize int64 - var err error if isContainerdStorage { // For containerd, use Docker-native format bypass optimization @@ -375,13 +299,13 @@ func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string, i // Use the layer's existing compressed format if available // This mimics what docker save/load does natively - compressedReader, err := layers[i].Compressed() + compressedReader, err := layer.Compressed() if err != nil { // Fallback to uncompressed if compressed not available - compressedReader = info.reader + compressedReader = layerReader } else { // Close the uncompressed reader since we're using compressed - info.reader.Close() + layerReader.Close() } defer compressedReader.Close() @@ -406,7 +330,7 @@ func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string, i } // Write tar header with calculated size (Docker-native format) - hdr := &tar.Header{Name: info.name, Mode: 0644, Size: uncompressedSize} + hdr := &tar.Header{Name: layerName, Mode: 0644, Size: uncompressedSize} if err := tw.WriteHeader(hdr); err != nil { return err } @@ -417,27 +341,22 @@ func (s *Store) addImageToTar(tw *tar.Writer, image v1.Image, withName string, i return err } } else { - // For standard Docker storage, use cached size calculation (existing behavior) - uncompressedSize, err = s.getLayerSize(layers[i]) + // For standard Docker storage, use original logic + uncompressedSize, err := s.getLayerSize(layer) if err != nil { return err } - - // Write tar header with cached size - hdr := &tar.Header{Name: info.name, Mode: 0644, Size: uncompressedSize} + hdr := &tar.Header{Name: layerName, Mode: 0644, Size: uncompressedSize} if err := tw.WriteHeader(hdr); err != nil { return err } - - // Copy layer data - _, err = io.Copy(tw, info.reader) - if err != nil { + if _, err := io.Copy(tw, layerReader); err != nil { return err } } } - - layerPaths = append(layerPaths, info.name) + blankIdx++ + layerPaths = append(layerPaths, layerName) } // Add manifest @@ -644,7 +563,7 @@ func (s *Store) doDownloadLayersFor(identifier string) error { // Limit concurrent layer processing to avoid overwhelming the system // Use a reasonable number based on typical layer counts and system resources - maxConcurrency := 4 + maxConcurrency := 3 if len(configFile.RootFS.DiffIDs) < maxConcurrency { maxConcurrency = len(configFile.RootFS.DiffIDs) }