Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#38](https://github.com/thanos-io/objstore/pull/38) GCS: Upgrade cloud.google.com/go/storage version to `v1.43.0`.
- [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange.
- [#157](https://github.com/thanos-io/objstore/pull/157) Azure: Add `az_tenant_id`, `client_id` and `client_secret` configs.
- [#178](https://github.com/thanos-io/objstore/pull/178) Feature: conditional upload API

### Fixed
- [#196](https://github.com/thanos-io/objstore/pull/196) GCS: fix error check in Exists method when object does not exist.
Expand Down
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ type Bucket interface {

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error
Upload(ctx context.Context, name string, r io.Reader, options ...ObjectUploadOption) error

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some random whitespace issues here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will resolve.

// SupportedObjectUploadOptions returns a list of ObjectUploadOptions supported by the underlying provider.
SupportedObjectUploadOptions() []ObjectUploadOptionType

// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Expand Down Expand Up @@ -154,6 +157,19 @@ Current object storage client implementations:

NOTE: Currently Thanos requires strong consistency (write-read) for object store implementation for singleton Compaction purposes.

#### Support for Conditional Writes

Most, not all, object stores provide an API for write conditions. The `objstore` module partially supports this using `ObjectUploadOption` parameters in `Upload` of the `Bucket` interface.

Version or etag metadata can be retrieved for use as write conditions from the `Attributes` method of `BucketReader`. Client should call `SupportedObjectUploadOptions` to validate which object upload options (`IfNotExists`, `IfMatch`, `IfNotMatch`) are supported by the provider.

Providers with conditional write support include:

- Google Cloud Storage ([cloud provider documentation](https://cloud.google.com/storage/docs/request-preconditions)))
- Azure Storage Buckets ([cloud provider documentation](https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations))
- S3 ([cloud provider documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html)). `IfNotMatch` is currently not supported by AWS.
- Local Filesystem (for testing and demos). Only supported by filesystems with extended attribute (`xattr`) support.

##### S3

Thanos uses the [minio client](https://github.com/minio/minio-go) library to upload Prometheus data into AWS S3.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/oracle/oci-go-sdk/v65 v65.41.1
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.10
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.44.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.40
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmd
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/xattr v0.4.10 h1:Qe0mtiNFHQZ296vRgUjRCoPHPqH7VdTOrZx3g0T+pGA=
github.com/pkg/xattr v0.4.10/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -265,6 +267,7 @@ golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKl
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
Expand Down
43 changes: 42 additions & 1 deletion inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package objstore
import (
"bytes"
"context"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -16,6 +18,7 @@ import (
)

var errNotFound = errors.New("inmem: object not found")
var errConditionNotMet = errors.New("inmem: condition not met")

// InMemBucket implements the objstore.Bucket interfaces against local memory.
// Methods from Bucket interface are thread-safe. Objects are assumed to be immutable.
Expand Down Expand Up @@ -148,6 +151,10 @@ func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
return []IterOptionType{Recursive, UpdatedAt}
}

func (b *InMemBucket) SupportedObjectUploadOptions() []ObjectUploadOptionType {
return []ObjectUploadOptionType{IfNotExists, IfMatch, IfNotMatch}
}

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
Expand Down Expand Up @@ -252,9 +259,40 @@ func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttribut
}

// Upload writes the file specified in src to into the memory.
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, _ ...ObjectUploadOption) error {
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
if err := ValidateUploadOptions(b.SupportedObjectUploadOptions(), opts...); err != nil {
return err
}

b.mtx.Lock()
defer b.mtx.Unlock()

params := ApplyObjectUploadOptions(opts...)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we validate the options here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops - done!

generation := 0

if prev, ok := b.attrs[name]; ok {
if prev.Version == nil || prev.Version.Type != Generation {
return fmt.Errorf("inmem object should always have a generational version")
}
if params.IfNotExists {
return errConditionNotMet
}
var err error
if generation, err = strconv.Atoi(prev.Version.Value); err != nil {
return err
}
if params.Condition != nil {
if params.Condition.Value != prev.Version.Value && !params.IfNotMatch {
return errConditionNotMet
} else if params.Condition.Value == prev.Version.Value && params.IfNotMatch {
return errConditionNotMet
}
}
} else if params.Condition != nil && !params.IfNotMatch {
return errConditionNotMet
}
generation++

body, err := io.ReadAll(r)
if err != nil {
return err
Expand All @@ -263,6 +301,7 @@ func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, _ ...O
b.attrs[name] = ObjectAttributes{
Size: int64(len(body)),
LastModified: time.Now(),
Version: &ObjectVersion{Type: Generation, Value: strconv.Itoa(generation)},
}
return nil
}
Expand All @@ -289,6 +328,8 @@ func (b *InMemBucket) IsAccessDeniedErr(err error) bool {
return false
}

func (b *InMemBucket) IsConditionNotMetErr(err error) bool { return errors.Is(err, errConditionNotMet) }

func (b *InMemBucket) Close() error { return nil }

// Name returns the bucket name.
Expand Down
163 changes: 141 additions & 22 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type Bucket interface {
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error

// SupportedObjectUploadOptions returns a list of ObjectUploadOptions supported by the underlying provider.
SupportedObjectUploadOptions() []ObjectUploadOptionType

// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Delete(ctx context.Context, name string) error
Expand Down Expand Up @@ -119,6 +122,9 @@ type BucketReader interface {
// IsAccessDeniedErr returns true if access to object is denied.
IsAccessDeniedErr(err error) bool

// IsConditionNotMetErr returns true if an ObjectUploadOption condition parameter (IfNotExists, IfMatch, IfNotMatch) was not met.
IsConditionNotMetErr(err error) bool

// Attributes returns information about the specified object.
Attributes(ctx context.Context, name string) (ObjectAttributes, error)
}
Expand Down Expand Up @@ -196,26 +202,6 @@ func ApplyIterOptions(options ...IterOption) IterParams {
return out
}

type UploadObjectParams struct {
ContentType string
}

type ObjectUploadOption func(f *UploadObjectParams)

func WithContentType(contentType string) ObjectUploadOption {
return func(f *UploadObjectParams) {
f.ContentType = contentType
}
}

func ApplyObjectUploadOptions(opts ...ObjectUploadOption) UploadObjectParams {
out := UploadObjectParams{}
for _, opt := range opts {
opt(&out)
}
return out
}

// DownloadOption configures the provided params.
type DownloadOption func(params *downloadParams)

Expand Down Expand Up @@ -274,12 +260,139 @@ func applyUploadOptions(options ...UploadOption) uploadParams {
return out
}

var ErrUploadOptionNotSupported = errors.New("upload option is not supported")
var ErrUploadOptionInvalid = errors.New("upload option is invalid")

// ObjectUploadOptionType is used for type-safe option support checking of ObjectUpload options.
type ObjectUploadOptionType int

const (
ContentType ObjectUploadOptionType = iota
IfNotExists
IfMatch
IfNotMatch
)

// ObjectUploadOption configures UploadObjectParams.
type ObjectUploadOption struct {
Type ObjectUploadOptionType
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that these two are disjointed now and users can misuse this ie create some ObjectUploadOption that doesn't do what Type says it should do. Should we make Type and Apply hidden i.e. lowercase so that users wouldn't be able to do that?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No fair enough, although I think there may be other places where symbols are overexposed maybe. Either way, easy change to make and very happy to do.

Apply func(params *UploadObjectParams)
}

// UploadObjectParams hold content-type and conditional write attribute metadata for upload operations that are
// supported by some provider implementations.
type UploadObjectParams struct {
ContentType string
IfNotExists bool
IfNotMatch bool
Condition *ObjectVersion
}

// WithContentType sets the content type of the object upload operation.
func WithContentType(contentType string) ObjectUploadOption {
return ObjectUploadOption{
Type: ContentType,
Apply: func(params *UploadObjectParams) {
params.ContentType = contentType
},
}
}

// WithIfNotExists if supported by the provider, only writes the object if the object does not already exist.
// When supported by providers this operation is usually atomic, however this is dependent on the provider.
func WithIfNotExists() ObjectUploadOption {
return ObjectUploadOption{
Type: IfNotExists,
Apply: func(params *UploadObjectParams) {
params.IfNotExists = true
},
}
}

// WithIfMatch if supported by the provider, only writes the object if the ETag value of the object in S3 matches the provided value,
// otherwise, the operation fails.
func WithIfMatch(ver *ObjectVersion) ObjectUploadOption {
return ObjectUploadOption{
Type: IfMatch,
Apply: func(params *UploadObjectParams) {
params.Condition = ver
},
}
}

// WithIfNotMatch if supported by the provider, only writes the object if the ETag value of the object in S3 does *not* match the provided value,
// otherwise, the operation fails.
func WithIfNotMatch(ver *ObjectVersion) ObjectUploadOption {
return ObjectUploadOption{
Type: IfNotMatch,
Apply: func(params *UploadObjectParams) {
params.Condition = ver
params.IfNotMatch = true
},
}
}

// ValidateUploadOptions ensures that only supported options are passed as options, and that options used simultaneously are valid.
func ValidateUploadOptions(supportedOptions []ObjectUploadOptionType, opts ...ObjectUploadOption) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should also validate IfNotExists and IfMatch/IfNotMatch aren't used together

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds sensible - done.


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically we don't put a empty line just after the definition.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

for _, opt := range opts {
if !slices.Contains(supportedOptions, opt.Type) {
return fmt.Errorf("%w: %d", ErrUploadOptionNotSupported, opt.Type)
}
if opt.Type == IfMatch || opt.Type == IfNotMatch {
candidate := &UploadObjectParams{}
opt.Apply(candidate)
if candidate.Condition == nil {
return fmt.Errorf("%w: Condition nil", ErrUploadOptionInvalid)
}
}
if opt.Type == IfNotExists {
// If IfNotExists provided alongside IfMatch or IfNotMatch.
if slices.ContainsFunc(opts, func(opt ObjectUploadOption) bool {
return opt.Type == IfMatch || opt.Type == IfNotMatch
}) {
return fmt.Errorf("%w: IfNotExists not valid with IfMatch or IfNotMatch", ErrUploadOptionInvalid)
}
}
}
return nil
}

// ApplyObjectUploadOptions creates UploadObjectParams from the options.
func ApplyObjectUploadOptions(opts ...ObjectUploadOption) UploadObjectParams {
out := UploadObjectParams{}
for _, opt := range opts {
opt.Apply(&out)
}
return out
}

type ObjectAttributes struct {
// Size is the object size in bytes.
Size int64 `json:"size"`

// LastModified is the timestamp the object was last modified.
LastModified time.Time `json:"last_modified"`

// ObjectVersion represents an etag, generation or revision that can be used as a version in conditional updates, if supported.
Version *ObjectVersion `json:"version,omitempty"`
}

// ObjectVersionType is used to specify the type of object version used by the underlying provider.
type ObjectVersionType int

const (
// Generation the provider supports a monotonically increasing integer version.
Generation ObjectVersionType = iota
// ETag the provider supports a hash or checksum version.
ETag ObjectVersionType = iota
)

type ObjectVersion struct {
// Type is the type of object version supported by the provider.
Type ObjectVersionType
// Value is a string representation of the version data from the provider.
Value string
}

type IterObjectAttributes struct {
Expand Down Expand Up @@ -387,14 +500,14 @@ func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdi

// UploadFile uploads the file with the given name to the bucket.
// It is a caller responsibility to clean partial upload in case of failure.
func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst string) error {
func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst string, opts ...ObjectUploadOption) error {
r, err := os.Open(filepath.Clean(src))
if err != nil {
return errors.Wrapf(err, "open file %s", src)
}
defer logerrcapture.Do(logger, r.Close, "close file %s", src)

if err := bkt.Upload(ctx, dst, r); err != nil {
if err := bkt.Upload(ctx, dst, r, opts...); err != nil {
return errors.Wrapf(err, "upload file %s as %s", src, dst)
}
level.Debug(logger).Log("msg", "uploaded file", "from", src, "dst", dst, "bucket", bkt.Name())
Expand Down Expand Up @@ -681,6 +794,10 @@ func (b *metricBucket) SupportedIterOptions() []IterOptionType {
return b.bkt.SupportedIterOptions()
}

func (b *metricBucket) SupportedObjectUploadOptions() []ObjectUploadOptionType {
return b.bkt.SupportedObjectUploadOptions()
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = OpAttributes
b.metrics.ops.WithLabelValues(op).Inc()
Expand Down Expand Up @@ -821,6 +938,8 @@ func (b *metricBucket) IsAccessDeniedErr(err error) bool {
return b.bkt.IsAccessDeniedErr(err)
}

func (b *metricBucket) IsConditionNotMetErr(err error) bool { return b.bkt.IsConditionNotMetErr(err) }

func (b *metricBucket) Close() error {
return b.bkt.Close()
}
Expand Down
Loading
Loading