Skip to content

Commit 74d18c8

Browse files
authored
Merge pull request #51 from buildkite/fix_blob_storage_buffering
fix: updated the blob storage to just pass through the reader
2 parents fce620e + 5315574 commit 74d18c8

File tree

5 files changed

+73
-88
lines changed

5 files changed

+73
-88
lines changed

blob_storage.go

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package buildkitelogs
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"os"
78
"runtime"
89
"time"
@@ -15,7 +16,6 @@ import (
1516
// BlobStorage provides an abstraction over blob storage backends
1617
type BlobStorage struct {
1718
bucket *blob.Bucket
18-
ctx context.Context
1919
}
2020

2121
// BlobMetadata contains metadata for cached blobs
@@ -46,7 +46,6 @@ func NewBlobStorage(ctx context.Context, storageURL string) (*BlobStorage, error
4646

4747
return &BlobStorage{
4848
bucket: bucket,
49-
ctx: ctx,
5049
}, nil
5150
}
5251

@@ -115,12 +114,12 @@ func GenerateBlobKey(org, pipeline, build, job string) string {
115114
}
116115

117116
// Exists checks if a blob exists in storage
118-
func (bs *BlobStorage) Exists(key string) (bool, error) {
119-
return bs.bucket.Exists(bs.ctx, key)
117+
func (bs *BlobStorage) Exists(ctx context.Context, key string) (bool, error) {
118+
return bs.bucket.Exists(ctx, key)
120119
}
121120

122121
// WriteWithMetadata writes data to blob storage with metadata
123-
func (bs *BlobStorage) WriteWithMetadata(key string, data []byte, metadata *BlobMetadata) error {
122+
func (bs *BlobStorage) WriteWithMetadata(ctx context.Context, key string, data []byte, metadata *BlobMetadata) error {
124123
opts := &blob.WriterOptions{}
125124

126125
if metadata != nil {
@@ -136,7 +135,7 @@ func (bs *BlobStorage) WriteWithMetadata(key string, data []byte, metadata *Blob
136135
}
137136
}
138137

139-
writer, err := bs.bucket.NewWriter(bs.ctx, key, opts)
138+
writer, err := bs.bucket.NewWriter(ctx, key, opts)
140139
if err != nil {
141140
return fmt.Errorf("failed to create blob writer: %w", err)
142141
}
@@ -150,23 +149,11 @@ func (bs *BlobStorage) WriteWithMetadata(key string, data []byte, metadata *Blob
150149
}
151150

152151
// ReadWithMetadata reads data from blob storage with metadata
153-
func (bs *BlobStorage) ReadWithMetadata(key string) ([]byte, *BlobMetadata, error) {
154-
reader, err := bs.bucket.NewReader(bs.ctx, key, nil)
155-
if err != nil {
156-
return nil, nil, fmt.Errorf("failed to create blob reader: %w", err)
157-
}
158-
defer reader.Close()
159-
160-
// Read data
161-
data := make([]byte, reader.Size())
162-
if _, err := reader.Read(data); err != nil {
163-
return nil, nil, fmt.Errorf("failed to read blob data: %w", err)
164-
}
165-
152+
func (bs *BlobStorage) ReadWithMetadata(ctx context.Context, key string) (*BlobMetadata, error) {
166153
// Get blob attributes for metadata
167-
attrs, err := bs.bucket.Attributes(bs.ctx, key)
154+
attrs, err := bs.bucket.Attributes(ctx, key)
168155
if err != nil {
169-
return nil, nil, fmt.Errorf("failed to get blob attributes: %w", err)
156+
return nil, fmt.Errorf("failed to get blob attributes: %w", err)
170157
}
171158

172159
// Extract metadata
@@ -190,21 +177,27 @@ func (bs *BlobStorage) ReadWithMetadata(key string) ([]byte, *BlobMetadata, erro
190177
}
191178
}
192179

193-
return data, metadata, nil
180+
return metadata, nil
181+
}
182+
183+
// Reader returns an io.ReadCloser for streaming blob data from the specified key.
184+
// The caller is responsible for closing the returned reader when done.
185+
func (bs *BlobStorage) Reader(ctx context.Context, key string) (io.ReadCloser, error) {
186+
return bs.bucket.NewReader(ctx, key, nil)
194187
}
195188

196189
// GetModTime returns the modification time of a blob
197-
func (bs *BlobStorage) GetModTime(key string) (time.Time, error) {
198-
attrs, err := bs.bucket.Attributes(bs.ctx, key)
190+
func (bs *BlobStorage) GetModTime(ctx context.Context, key string) (time.Time, error) {
191+
attrs, err := bs.bucket.Attributes(ctx, key)
199192
if err != nil {
200193
return time.Time{}, fmt.Errorf("failed to get blob attributes: %w", err)
201194
}
202195
return attrs.ModTime, nil
203196
}
204197

205198
// Delete removes a blob from storage
206-
func (bs *BlobStorage) Delete(key string) error {
207-
return bs.bucket.Delete(bs.ctx, key)
199+
func (bs *BlobStorage) Delete(ctx context.Context, key string) error {
200+
return bs.bucket.Delete(ctx, key)
208201
}
209202

210203
// Close closes the blob storage connection

blob_storage_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ func TestBlobStorage(t *testing.T) {
4343
}
4444

4545
// Test write
46-
err = blobStorage.WriteWithMetadata(key, testData, metadata)
46+
err = blobStorage.WriteWithMetadata(ctx, key, testData, metadata)
4747
if err != nil {
4848
t.Fatalf("Failed to write blob: %v", err)
4949
}
5050

5151
// Test exists
52-
exists, err := blobStorage.Exists(key)
52+
exists, err := blobStorage.Exists(ctx, key)
5353
if err != nil {
5454
t.Fatalf("Failed to check blob existence: %v", err)
5555
}
@@ -58,15 +58,11 @@ func TestBlobStorage(t *testing.T) {
5858
}
5959

6060
// Test read
61-
readData, readMetadata, err := blobStorage.ReadWithMetadata(key)
61+
readMetadata, err := blobStorage.ReadWithMetadata(ctx, key)
6262
if err != nil {
6363
t.Fatalf("Failed to read blob: %v", err)
6464
}
6565

66-
if string(readData) != string(testData) {
67-
t.Errorf("Expected data %s, got %s", string(testData), string(readData))
68-
}
69-
7066
if readMetadata == nil {
7167
t.Fatal("Expected metadata, got nil")
7268
}

cache.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package buildkitelogs
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"os"
78
"path/filepath"
89
)
@@ -57,24 +58,25 @@ func IsCacheFileExists(org, pipeline, build, job string) (bool, string, error) {
5758
}
5859

5960
// createLocalCacheFile creates a local file from blob storage for compatibility
60-
func createLocalCacheFile(ctx context.Context, blobStorage *BlobStorage, blobKey, org, pipeline, build, job string) (string, error) {
61-
// Get local cache file path for compatibility
62-
cacheFilePath, err := GetCacheFilePath(org, pipeline, build, job)
61+
func createLocalCacheFile(ctx context.Context, blobStorage *BlobStorage, blobKey string) (string, error) {
62+
cacheFilePath, err := os.CreateTemp("", "bklog-")
6363
if err != nil {
64-
return "", err
64+
return "", fmt.Errorf("failed to create local cache file: %w", err)
6565
}
66+
defer cacheFilePath.Close()
6667

6768
// Read from blob storage
68-
data, _, err := blobStorage.ReadWithMetadata(blobKey)
69+
reader, err := blobStorage.Reader(ctx, blobKey)
6970
if err != nil {
7071
return "", fmt.Errorf("failed to read from blob storage: %w", err)
7172
}
73+
defer reader.Close()
7274

7375
// Write to local cache file
74-
err = os.WriteFile(cacheFilePath, data, 0600)
76+
_, err = io.Copy(cacheFilePath, reader)
7577
if err != nil {
7678
return "", fmt.Errorf("failed to write local cache file: %w", err)
7779
}
7880

79-
return cacheFilePath, nil
81+
return cacheFilePath.Name(), nil
8082
}

0 commit comments

Comments
 (0)