Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions go/pkg/fakes/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ func (f *Writer) QueryWriteStatus(context.Context, *bspb.QueryWriteStatusRequest
// CAS is a fake CAS that implements FindMissingBlobs, Read and Write, storing stored blobs
// in a map. It also counts the number of requests to store received, for validating batching logic.
type CAS struct {
// InstanceName is the expected instance name for all requests.
InstanceName string
// Maximum batch byte size to verify requests against.
BatchSize int
ReqSleepDuration time.Duration
Expand All @@ -278,17 +280,26 @@ type CAS struct {
maxConcReqs int
}

// NewCAS returns a new empty fake CAS.
func NewCAS() *CAS {
func newCASBase(instanceName string) *CAS {
c := &CAS{
InstanceName: instanceName,
BatchSize: client.DefaultMaxBatchSize,
PerDigestBlockFn: make(map[digest.Digest]func()),
}

c.Clear()
return c
}

// NewCAS returns a new empty fake CAS with the default instance name.
func NewCAS() *CAS {
return newCASBase("instance")
}

// NewCASNamed returns a new empty fake CAS with the given instance name.
func NewCASNamed(name string) *CAS {
return newCASBase(name)
}

// Clear removes all results from the cache.
func (f *CAS) Clear() {
f.mu.Lock()
Expand Down Expand Up @@ -371,8 +382,8 @@ func (f *CAS) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRe
f.mu.Lock()
defer f.mu.Unlock()

if req.InstanceName != "instance" {
return nil, status.Error(codes.InvalidArgument, "test fake expected instance name \"instance\"")
if req.InstanceName != f.InstanceName {
return nil, status.Errorf(codes.InvalidArgument, "test fake expected instance name %q, got %q", f.InstanceName, req.InstanceName)
}
resp := new(repb.FindMissingBlobsResponse)
for _, dg := range req.BlobDigests {
Expand Down Expand Up @@ -417,8 +428,8 @@ func (f *CAS) BatchUpdateBlobs(ctx context.Context, req *repb.BatchUpdateBlobsRe
}
f.mu.Unlock()

if req.InstanceName != "instance" {
return nil, status.Error(codes.InvalidArgument, "test fake expected instance name \"instance\"")
if req.InstanceName != f.InstanceName {
return nil, status.Errorf(codes.InvalidArgument, "test fake expected instance name %q, got %q", f.InstanceName, req.InstanceName)
}

reqBlob, _ := proto.Marshal(req)
Expand Down Expand Up @@ -479,8 +490,8 @@ func (f *CAS) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsReques
}
f.mu.Unlock()

if req.InstanceName != "instance" {
return nil, status.Error(codes.InvalidArgument, "test fake expected instance name \"instance\"")
if req.InstanceName != f.InstanceName {
return nil, status.Errorf(codes.InvalidArgument, "test fake expected instance name %q, got %q", f.InstanceName, req.InstanceName)
}

reqBlob, _ := proto.Marshal(req)
Expand Down Expand Up @@ -531,6 +542,9 @@ func (f *CAS) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsReques
// GetTree implements the corresponding RE API function.
func (f *CAS) GetTree(req *repb.GetTreeRequest, stream regrpc.ContentAddressableStorage_GetTreeServer) error {
f.maybeSleep()
if req.InstanceName != f.InstanceName {
return status.Errorf(codes.InvalidArgument, "test fake expected instance name %q, got %q", f.InstanceName, req.InstanceName)
}
rootDigest, err := digest.NewFromProto(req.RootDigest)
if err != nil {
return fmt.Errorf("unable to parsse root digest %v", req.RootDigest)
Expand Down Expand Up @@ -584,8 +598,8 @@ func (f *CAS) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
}

path := strings.Split(req.ResourceName, "/")
if (len(path) != 6 && len(path) != 7) || path[0] != "instance" || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
if (len(path) != 6 && len(path) != 7) || path[0] != f.InstanceName || path[1] != "uploads" || (path[3] != "blobs" && path[3] != "compressed-blobs") {
return status.Errorf(codes.InvalidArgument, "test fake expected resource name of the form \"%s/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"", f.InstanceName)
}
// indexOffset for all 4+ paths - `compressed-blobs` paths have one more element.
indexOffset := 0
Expand All @@ -598,14 +612,14 @@ func (f *CAS) Write(stream bsgrpc.ByteStream_WriteServer) (err error) {
}
size, err := strconv.ParseInt(path[5+indexOffset], 10, 64)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
return status.Errorf(codes.InvalidArgument, "test fake expected resource name of the form \"%s/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"", f.InstanceName)
}
dg, err := digest.New(path[4+indexOffset], size)
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected a valid digest as part of the resource name: \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
return status.Errorf(codes.InvalidArgument, "test fake expected a valid digest as part of the resource name: \"%s/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"", f.InstanceName)
}
if _, err := uuid.Parse(path[2]); err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
return status.Errorf(codes.InvalidArgument, "test fake expected resource name of the form \"%s/uploads/<uuid>/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"", f.InstanceName)
}

f.maybeSleep()
Expand Down Expand Up @@ -692,8 +706,8 @@ func (f *CAS) Read(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) e
}

path := strings.Split(req.ResourceName, "/")
if (len(path) != 4 && len(path) != 5) || path[0] != "instance" || (path[1] != "blobs" && path[1] != "compressed-blobs") {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
if (len(path) != 4 && len(path) != 5) || path[0] != f.InstanceName || (path[1] != "blobs" && path[1] != "compressed-blobs") {
return status.Errorf(codes.InvalidArgument, "test fake expected resource name of the form \"%s/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"", f.InstanceName)
}
// indexOffset for all 2+ paths - `compressed-blobs` has one more URI element.
indexOffset := 0
Expand All @@ -703,7 +717,7 @@ func (f *CAS) Read(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) e

size, err := strconv.Atoi(path[3+indexOffset])
if err != nil {
return status.Error(codes.InvalidArgument, "test fake expected resource name of the form \"instance/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"")
return status.Errorf(codes.InvalidArgument, "test fake expected resource name of the form \"%s/blobs|compressed-blobs/<compressor?>/<hash>/<size>\"", f.InstanceName)
}
dg := digest.TestNew(path[2+indexOffset], int64(size))
f.maybeSleep()
Expand Down
Loading