Skip to content

Commit 8329295

Browse files
committed
move DecompressStream, CompressStream to compression pkg
Signed-off-by: Sebastiaan van Stijn <[email protected]>
1 parent 303f599 commit 8329295

10 files changed

+478
-400
lines changed

Diff for: archive.go

+9-200
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,18 @@ package archive
33

44
import (
55
"archive/tar"
6-
"bufio"
7-
"bytes"
8-
"compress/bzip2"
9-
"compress/gzip"
106
"context"
117
"errors"
128
"fmt"
139
"io"
1410
"os"
15-
"os/exec"
1611
"path/filepath"
1712
"runtime"
18-
"strconv"
1913
"strings"
20-
"sync"
2114
"syscall"
2215
"time"
2316

2417
"github.com/containerd/log"
25-
"github.com/klauspost/compress/zstd"
2618
"github.com/moby/patternmatcher"
2719
"github.com/moby/sys/sequential"
2820
"github.com/moby/sys/user"
@@ -123,7 +115,7 @@ func IsArchivePath(path string) bool {
123115
return false
124116
}
125117
defer file.Close()
126-
rdr, err := DecompressStream(file)
118+
rdr, err := compression.DecompressStream(file)
127119
if err != nil {
128120
return false
129121
}
@@ -140,35 +132,6 @@ func DetectCompression(source []byte) compression.Compression {
140132
return compression.Detect(source)
141133
}
142134

143-
func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) {
144-
args := []string{"xz", "-d", "-c", "-q"}
145-
146-
return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive)
147-
}
148-
149-
func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
150-
if noPigzEnv := os.Getenv("MOBY_DISABLE_PIGZ"); noPigzEnv != "" {
151-
noPigz, err := strconv.ParseBool(noPigzEnv)
152-
if err != nil {
153-
log.G(ctx).WithError(err).Warn("invalid value in MOBY_DISABLE_PIGZ env var")
154-
}
155-
if noPigz {
156-
log.G(ctx).Debugf("Use of pigz is disabled due to MOBY_DISABLE_PIGZ=%s", noPigzEnv)
157-
return gzip.NewReader(buf)
158-
}
159-
}
160-
161-
unpigzPath, err := exec.LookPath("unpigz")
162-
if err != nil {
163-
log.G(ctx).Debugf("unpigz binary not found, falling back to go gzip library")
164-
return gzip.NewReader(buf)
165-
}
166-
167-
log.G(ctx).Debugf("Using %s to decompress", unpigzPath)
168-
169-
return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
170-
}
171-
172135
type readCloserWrapper struct {
173136
io.Reader
174137
closer func() error
@@ -181,132 +144,18 @@ func (r *readCloserWrapper) Close() error {
181144
return nil
182145
}
183146

184-
var bufioReader32KPool = &sync.Pool{
185-
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
186-
}
187-
188-
type bufferedReader struct {
189-
buf *bufio.Reader
190-
}
191-
192-
func newBufferedReader(r io.Reader) *bufferedReader {
193-
buf := bufioReader32KPool.Get().(*bufio.Reader)
194-
buf.Reset(r)
195-
return &bufferedReader{buf}
196-
}
197-
198-
func (r *bufferedReader) Read(p []byte) (int, error) {
199-
if r.buf == nil {
200-
return 0, io.EOF
201-
}
202-
n, err := r.buf.Read(p)
203-
if errors.Is(err, io.EOF) {
204-
r.buf.Reset(nil)
205-
bufioReader32KPool.Put(r.buf)
206-
r.buf = nil
207-
}
208-
return n, err
209-
}
210-
211-
func (r *bufferedReader) Peek(n int) ([]byte, error) {
212-
if r.buf == nil {
213-
return nil, io.EOF
214-
}
215-
return r.buf.Peek(n)
216-
}
217-
218147
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
148+
//
149+
// Deprecated: use [compression.DecompressStream].
219150
func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
220-
buf := newBufferedReader(archive)
221-
bs, err := buf.Peek(10)
222-
if err != nil && !errors.Is(err, io.EOF) {
223-
// Note: we'll ignore any io.EOF error because there are some odd
224-
// cases where the layer.tar file will be empty (zero bytes) and
225-
// that results in an io.EOF from the Peek() call. So, in those
226-
// cases we'll just treat it as a non-compressed stream and
227-
// that means just create an empty layer.
228-
// See Issue 18170
229-
return nil, err
230-
}
231-
232-
switch comp := compression.Detect(bs); comp {
233-
case compression.Uncompressed:
234-
return &readCloserWrapper{
235-
Reader: buf,
236-
}, nil
237-
case compression.Gzip:
238-
ctx, cancel := context.WithCancel(context.Background())
239-
240-
gzReader, err := gzDecompress(ctx, buf)
241-
if err != nil {
242-
cancel()
243-
return nil, err
244-
}
245-
return &readCloserWrapper{
246-
Reader: gzReader,
247-
closer: func() error {
248-
cancel()
249-
return gzReader.Close()
250-
},
251-
}, nil
252-
case compression.Bzip2:
253-
bz2Reader := bzip2.NewReader(buf)
254-
return &readCloserWrapper{
255-
Reader: bz2Reader,
256-
}, nil
257-
case compression.Xz:
258-
ctx, cancel := context.WithCancel(context.Background())
259-
260-
xzReader, err := xzDecompress(ctx, buf)
261-
if err != nil {
262-
cancel()
263-
return nil, err
264-
}
265-
266-
return &readCloserWrapper{
267-
Reader: xzReader,
268-
closer: func() error {
269-
cancel()
270-
return xzReader.Close()
271-
},
272-
}, nil
273-
case compression.Zstd:
274-
zstdReader, err := zstd.NewReader(buf)
275-
if err != nil {
276-
return nil, err
277-
}
278-
return &readCloserWrapper{
279-
Reader: zstdReader,
280-
closer: func() error {
281-
zstdReader.Close()
282-
return nil
283-
},
284-
}, nil
285-
default:
286-
return nil, fmt.Errorf("unsupported compression format: %s", (&comp).Extension())
287-
}
151+
return compression.DecompressStream(archive)
288152
}
289153

290-
type nopWriteCloser struct {
291-
io.Writer
292-
}
293-
294-
func (nopWriteCloser) Close() error { return nil }
295-
296154
// CompressStream compresses the dest with specified compression algorithm.
155+
//
156+
// Deprecated: use [compression.CompressStream].
297157
func CompressStream(dest io.Writer, comp compression.Compression) (io.WriteCloser, error) {
298-
switch comp {
299-
case compression.Uncompressed:
300-
return nopWriteCloser{dest}, nil
301-
case compression.Gzip:
302-
return gzip.NewWriter(dest), nil
303-
case compression.Bzip2, compression.Xz:
304-
// archive/bzip2 does not support writing, and there is no xz support at all
305-
// However, this is not a problem as docker only currently generates gzipped tars
306-
return nil, fmt.Errorf("unsupported compression format: %s", (&comp).Extension())
307-
default:
308-
return nil, fmt.Errorf("unsupported compression format: %s", (&comp).Extension())
309-
}
158+
return compression.CompressStream(dest, comp)
310159
}
311160

312161
// TarModifierFunc is a function that can be passed to ReplaceFileTarWrapper to
@@ -865,7 +714,7 @@ func NewTarballer(srcPath string, options *TarOptions) (*Tarballer, error) {
865714

866715
pipeReader, pipeWriter := io.Pipe()
867716

868-
compressWriter, err := CompressStream(pipeWriter, options.Compression)
717+
compressWriter, err := compression.CompressStream(pipeWriter, options.Compression)
869718
if err != nil {
870719
return nil, err
871720
}
@@ -1242,7 +1091,7 @@ func untarHandler(tarArchive io.Reader, dest string, options *TarOptions, decomp
12421091

12431092
r := tarArchive
12441093
if decompress {
1245-
decompressedArchive, err := DecompressStream(tarArchive)
1094+
decompressedArchive, err := compression.DecompressStream(tarArchive)
12461095
if err != nil {
12471096
return err
12481097
}
@@ -1389,43 +1238,3 @@ func remapIDs(idMapping user.IdentityMapping, hdr *tar.Header) error {
13891238
hdr.Uid, hdr.Gid = uid, gid
13901239
return err
13911240
}
1392-
1393-
// cmdStream executes a command, and returns its stdout as a stream.
1394-
// If the command fails to run or doesn't complete successfully, an error
1395-
// will be returned, including anything written on stderr.
1396-
func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
1397-
cmd.Stdin = input
1398-
pipeR, pipeW := io.Pipe()
1399-
cmd.Stdout = pipeW
1400-
var errBuf bytes.Buffer
1401-
cmd.Stderr = &errBuf
1402-
1403-
// Run the command and return the pipe
1404-
if err := cmd.Start(); err != nil {
1405-
return nil, err
1406-
}
1407-
1408-
// Ensure the command has exited before we clean anything up
1409-
done := make(chan struct{})
1410-
1411-
// Copy stdout to the returned pipe
1412-
go func() {
1413-
if err := cmd.Wait(); err != nil {
1414-
_ = pipeW.CloseWithError(fmt.Errorf("%w: %s", err, errBuf.String()))
1415-
} else {
1416-
_ = pipeW.Close()
1417-
}
1418-
close(done)
1419-
}()
1420-
1421-
return &readCloserWrapper{
1422-
Reader: pipeR,
1423-
closer: func() error {
1424-
// Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as
1425-
// cmd.Wait waits for any non-file stdout/stderr/stdin to close.
1426-
err := pipeR.Close()
1427-
<-done
1428-
return err
1429-
},
1430-
}, nil
1431-
}

0 commit comments

Comments
 (0)