55 "io"
66 "slices"
77 "sync"
8+ "sync/atomic"
89
910 remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
1011 "github.com/buildbarn/bb-storage/pkg/blobstore"
@@ -28,29 +29,26 @@ type casBlobAccess struct {
2829 capabilitiesClient remoteexecution.CapabilitiesClient
2930 uuidGenerator util.UUIDGenerator
3031 readChunkSize int
31- compressionThresholdBytes int64
32- supportedCompressors []remoteexecution.Compressor_Value
33- supportedCompressorsMutex sync.RWMutex
34- capabilitiesCalled bool
32+ enableZSTDCompression bool
33+ supportedCompressors atomic.Pointer [[]remoteexecution.Compressor_Value ]
3534}
3635
3736// NewCASBlobAccess creates a BlobAccess handle that relays any requests
38- // to a GRPC service that implements the bytestream.ByteStream and
37+ // to a gRPC service that implements the bytestream.ByteStream and
3938// remoteexecution.ContentAddressableStorage services. Those are the
4039// services that Bazel uses to access blobs stored in the Content
4140// Addressable Storage.
4241//
43- // If compressionThresholdBytes is > 0, the client will attempt to use
44- // ZSTD compression for blobs larger than this threshold. The server's
45- // supported compressors will be checked via GetCapabilities().
46- func NewCASBlobAccess (client grpc.ClientConnInterface , uuidGenerator util.UUIDGenerator , readChunkSize int , compressionThresholdBytes int64 ) blobstore.BlobAccess {
42+ // If enableZSTDCompression is true, the client will use ZSTD compression
43+ // for ByteStream operations if the server supports it.
44+ func NewCASBlobAccess (client grpc.ClientConnInterface , uuidGenerator util.UUIDGenerator , readChunkSize int , enableZSTDCompression bool ) blobstore.BlobAccess {
4745 return & casBlobAccess {
4846 byteStreamClient : bytestream .NewByteStreamClient (client ),
4947 contentAddressableStorageClient : remoteexecution .NewContentAddressableStorageClient (client ),
5048 capabilitiesClient : remoteexecution .NewCapabilitiesClient (client ),
5149 uuidGenerator : uuidGenerator ,
5250 readChunkSize : readChunkSize ,
53- compressionThresholdBytes : compressionThresholdBytes ,
51+ enableZSTDCompression : enableZSTDCompression ,
5452 }
5553}
5654
@@ -178,34 +176,27 @@ func (w *zstdByteStreamWriter) Close() error {
178176
179177const resourceNameHeader = "build.bazel.remote.execution.v2.resource-name"
180178
181- // shouldUseCompression checks if compression should be used for a blob of the given size .
182- // It also ensures GetCapabilities has been called to negotiate compression support.
183- func (ba * casBlobAccess ) shouldUseCompression (ctx context.Context , digest digest.Digest ) (bool , error ) {
184- if ba .compressionThresholdBytes <= 0 || digest . GetSizeBytes () < ba . compressionThresholdBytes {
179+ // shouldUseZSTDCompression checks if ZSTD compression should be used.
180+ // It ensures GetCapabilities has been called to negotiate compression support.
181+ func (ba * casBlobAccess ) shouldUseZSTDCompression (ctx context.Context , digest digest.Digest ) (bool , error ) {
182+ if ! ba .enableZSTDCompression {
185183 return false , nil
186184 }
187185
188- ba .supportedCompressorsMutex .RLock ()
189- capabilitiesCalled := ba .capabilitiesCalled
190- supportedCompressors := ba .supportedCompressors
191- ba .supportedCompressorsMutex .RUnlock ()
192-
193- if ! capabilitiesCalled {
194- // Call GetCapabilities to check server support
195- _ , err := ba .GetCapabilities (ctx , digest .GetDigestFunction ().GetInstanceName ())
196- if err != nil {
186+ supportedCompressors := ba .supportedCompressors .Load ()
187+ if supportedCompressors == nil {
188+ // Call GetCapabilities to check server support.
189+ if _ , err := ba .GetCapabilities (ctx , digest .GetDigestFunction ().GetInstanceName ()); err != nil {
197190 return false , err
198191 }
199- ba .supportedCompressorsMutex .RLock ()
200- supportedCompressors = ba .supportedCompressors
201- ba .supportedCompressorsMutex .RUnlock ()
192+ supportedCompressors = ba .supportedCompressors .Load ()
202193 }
203194
204- return slices .Contains (supportedCompressors , remoteexecution .Compressor_ZSTD ), nil
195+ return slices .Contains (* supportedCompressors , remoteexecution .Compressor_ZSTD ), nil
205196}
206197
207198func (ba * casBlobAccess ) Get (ctx context.Context , digest digest.Digest ) buffer.Buffer {
208- useCompression , err := ba .shouldUseCompression (ctx , digest )
199+ useCompression , err := ba .shouldUseZSTDCompression (ctx , digest )
209200 if err != nil {
210201 return buffer .NewBufferFromError (err )
211202 }
@@ -248,7 +239,7 @@ func (ba *casBlobAccess) GetFromComposite(ctx context.Context, parentDigest, chi
248239}
249240
250241func (ba * casBlobAccess ) Put (ctx context.Context , digest digest.Digest , b buffer.Buffer ) error {
251- useCompression , err := ba .shouldUseCompression (ctx , digest )
242+ useCompression , err := ba .shouldUseZSTDCompression (ctx , digest )
252243 if err != nil {
253244 b .Discard ()
254245 return err
@@ -387,11 +378,8 @@ func (ba *casBlobAccess) GetCapabilities(ctx context.Context, instanceName diges
387378
388379 cacheCapabilities := serverCapabilities .CacheCapabilities
389380
390- // Store supported compressors for compression negotiation
391- ba .supportedCompressorsMutex .Lock ()
392- ba .supportedCompressors = cacheCapabilities .SupportedCompressors
393- ba .capabilitiesCalled = true
394- ba .supportedCompressorsMutex .Unlock ()
381+ // Store supported compressors for compression negotiation.
382+ ba .supportedCompressors .Store (& cacheCapabilities .SupportedCompressors )
395383
396384 // Only return fields that pertain to the Content Addressable
397385 // Storage. Don't set 'max_batch_total_size_bytes', as we don't
0 commit comments