Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#38](https://github.com/thanos-io/objstore/pull/38) GCS: Upgrade cloud.google.com/go/storage version to `v1.43.0`.
- [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange.
- [#157](https://github.com/thanos-io/objstore/pull/157) Azure: Add `az_tenant_id`, `client_id` and `client_secret` configs.
- [#178](https://github.com/thanos-io/objstore/pull/178) Feature: conditional upload API

### Fixed
- [#196](https://github.com/thanos-io/objstore/pull/196) GCS: fix error check in Exists method when object does not exist.
Expand Down
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ type Bucket interface {

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error
Upload(ctx context.Context, name string, r io.Reader, options ...ObjectUploadOption) error

// SupportedObjectUploadOptions returns a list of ObjectUploadOptions supported by the underlying provider.
SupportedObjectUploadOptions() []ObjectUploadOptionType

// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Expand Down Expand Up @@ -154,6 +157,19 @@ Current object storage client implementations:

NOTE: Currently Thanos requires strong consistency (write-read) for object store implementation for singleton Compaction purposes.

#### Support for Conditional Writes

Most, not all, object stores provide an API for write conditions. The `objstore` module partially supports this using `ObjectUploadOption` parameters in `Upload` of the `Bucket` interface.

Version or etag metadata can be retrieved for use as write conditions from the `Attributes` method of `BucketReader`. Client should call `SupportedObjectUploadOptions` to validate which object upload options (`IfNotExists`, `IfMatch`, `IfNotMatch`) are supported by the provider.

Providers with conditional write support include:

- Google Cloud Storage ([cloud provider documentation](https://cloud.google.com/storage/docs/request-preconditions)))
- Azure Storage Buckets ([cloud provider documentation](https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations))
- S3 ([cloud provider documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html)). `IfNotMatch` is currently not supported by AWS.
- Local Filesystem (for testing and demos). Only supported by filesystems with extended attribute (`xattr`) support.

##### S3

Thanos uses the [minio client](https://github.com/minio/minio-go) library to upload Prometheus data into AWS S3.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/oracle/oci-go-sdk/v65 v65.41.1
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.10
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.44.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.40
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmd
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/xattr v0.4.10 h1:Qe0mtiNFHQZ296vRgUjRCoPHPqH7VdTOrZx3g0T+pGA=
github.com/pkg/xattr v0.4.10/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -265,6 +267,7 @@ golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKl
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
Expand Down
43 changes: 42 additions & 1 deletion inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package objstore
import (
"bytes"
"context"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -16,6 +18,7 @@ import (
)

var errNotFound = errors.New("inmem: object not found")
var errConditionNotMet = errors.New("inmem: condition not met")

// InMemBucket implements the objstore.Bucket interfaces against local memory.
// Methods from Bucket interface are thread-safe. Objects are assumed to be immutable.
Expand Down Expand Up @@ -148,6 +151,10 @@ func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
return []IterOptionType{Recursive, UpdatedAt}
}

func (b *InMemBucket) SupportedObjectUploadOptions() []ObjectUploadOptionType {
return []ObjectUploadOptionType{IfNotExists, IfMatch, IfNotMatch}
}

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
Expand Down Expand Up @@ -252,9 +259,40 @@ func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttribut
}

// Upload writes the file specified in src to into the memory.
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, _ ...ObjectUploadOption) error {
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
if err := ValidateUploadOptions(b.SupportedObjectUploadOptions(), opts...); err != nil {
return err
}

b.mtx.Lock()
defer b.mtx.Unlock()

params := ApplyObjectUploadOptions(opts...)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we validate the options here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops - done!

generation := 0

if prev, ok := b.attrs[name]; ok {
if prev.Version == nil || prev.Version.Type != Generation {
return fmt.Errorf("inmem object should always have a generational version")
}
if params.IfNotExists {
return errConditionNotMet
}
var err error
if generation, err = strconv.Atoi(prev.Version.Value); err != nil {
return err
}
if params.Condition != nil {
if params.Condition.Value != prev.Version.Value && !params.IfNotMatch {
return errConditionNotMet
} else if params.Condition.Value == prev.Version.Value && params.IfNotMatch {
return errConditionNotMet
}
}
} else if params.Condition != nil && !params.IfNotMatch {
return errConditionNotMet
}
generation++

body, err := io.ReadAll(r)
if err != nil {
return err
Expand All @@ -263,6 +301,7 @@ func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, _ ...O
b.attrs[name] = ObjectAttributes{
Size: int64(len(body)),
LastModified: time.Now(),
Version: &ObjectVersion{Type: Generation, Value: strconv.Itoa(generation)},
}
return nil
}
Expand All @@ -289,6 +328,8 @@ func (b *InMemBucket) IsAccessDeniedErr(err error) bool {
return false
}

func (b *InMemBucket) IsConditionNotMetErr(err error) bool { return errors.Is(err, errConditionNotMet) }

func (b *InMemBucket) Close() error { return nil }

// Name returns the bucket name.
Expand Down
162 changes: 140 additions & 22 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type Bucket interface {
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error

// SupportedObjectUploadOptions returns a list of ObjectUploadOptions supported by the underlying provider.
SupportedObjectUploadOptions() []ObjectUploadOptionType

// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Delete(ctx context.Context, name string) error
Expand Down Expand Up @@ -119,6 +122,9 @@ type BucketReader interface {
// IsAccessDeniedErr returns true if access to object is denied.
IsAccessDeniedErr(err error) bool

// IsConditionNotMetErr returns true if an ObjectUploadOption condition parameter (IfNotExists, IfMatch, IfNotMatch) was not met.
IsConditionNotMetErr(err error) bool

// Attributes returns information about the specified object.
Attributes(ctx context.Context, name string) (ObjectAttributes, error)
}
Expand Down Expand Up @@ -196,26 +202,6 @@ func ApplyIterOptions(options ...IterOption) IterParams {
return out
}

type UploadObjectParams struct {
ContentType string
}

type ObjectUploadOption func(f *UploadObjectParams)

func WithContentType(contentType string) ObjectUploadOption {
return func(f *UploadObjectParams) {
f.ContentType = contentType
}
}

func ApplyObjectUploadOptions(opts ...ObjectUploadOption) UploadObjectParams {
out := UploadObjectParams{}
for _, opt := range opts {
opt(&out)
}
return out
}

// DownloadOption configures the provided params.
type DownloadOption func(params *downloadParams)

Expand Down Expand Up @@ -274,12 +260,138 @@ func applyUploadOptions(options ...UploadOption) uploadParams {
return out
}

var ErrUploadOptionNotSupported = errors.New("upload option is not supported")
var ErrUploadOptionInvalid = errors.New("upload option is invalid")

// ObjectUploadOptionType is used for type-safe option support checking of ObjectUpload options.
type ObjectUploadOptionType int

const (
ContentType ObjectUploadOptionType = iota
IfNotExists
IfMatch
IfNotMatch
)

// ObjectUploadOption configures UploadObjectParams.
type ObjectUploadOption struct {
optType ObjectUploadOptionType
apply func(params *UploadObjectParams)
}

// UploadObjectParams hold content-type and conditional write attribute metadata for upload operations that are
// supported by some provider implementations.
type UploadObjectParams struct {
ContentType string
IfNotExists bool
IfNotMatch bool
Condition *ObjectVersion
}

// WithContentType sets the content type of the object upload operation.
func WithContentType(contentType string) ObjectUploadOption {
return ObjectUploadOption{
optType: ContentType,
apply: func(params *UploadObjectParams) {
params.ContentType = contentType
},
}
}

// WithIfNotExists if supported by the provider, only writes the object if the object does not already exist.
// When supported by providers this operation is usually atomic, however this is dependent on the provider.
func WithIfNotExists() ObjectUploadOption {
return ObjectUploadOption{
optType: IfNotExists,
apply: func(params *UploadObjectParams) {
params.IfNotExists = true
},
}
}

// WithIfMatch if supported by the provider, only writes the object if the ETag value of the object in S3 matches the provided value,
// otherwise, the operation fails.
func WithIfMatch(ver *ObjectVersion) ObjectUploadOption {
return ObjectUploadOption{
optType: IfMatch,
apply: func(params *UploadObjectParams) {
params.Condition = ver
},
}
}

// WithIfNotMatch if supported by the provider, only writes the object if the ETag value of the object in S3 does *not* match the provided value,
// otherwise, the operation fails.
func WithIfNotMatch(ver *ObjectVersion) ObjectUploadOption {
return ObjectUploadOption{
optType: IfNotMatch,
apply: func(params *UploadObjectParams) {
params.Condition = ver
params.IfNotMatch = true
},
}
}

// ValidateUploadOptions ensures that only supported options are passed as options, and that options used simultaneously are valid.
func ValidateUploadOptions(supportedOptions []ObjectUploadOptionType, opts ...ObjectUploadOption) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should also validate IfNotExists and IfMatch/IfNotMatch aren't used together

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds sensible - done.

for _, opt := range opts {
if !slices.Contains(supportedOptions, opt.optType) {
return fmt.Errorf("%w: %d", ErrUploadOptionNotSupported, opt.optType)
}
if opt.optType == IfMatch || opt.optType == IfNotMatch {
candidate := &UploadObjectParams{}
opt.apply(candidate)
if candidate.Condition == nil {
return fmt.Errorf("%w: Condition nil", ErrUploadOptionInvalid)
}
}
if opt.optType == IfNotExists {
// If IfNotExists provided alongside IfMatch or IfNotMatch.
if slices.ContainsFunc(opts, func(opt ObjectUploadOption) bool {
return opt.optType == IfMatch || opt.optType == IfNotMatch
}) {
return fmt.Errorf("%w: IfNotExists not valid with IfMatch or IfNotMatch", ErrUploadOptionInvalid)
}
}
}
return nil
}

// ApplyObjectUploadOptions creates UploadObjectParams from the options.
func ApplyObjectUploadOptions(opts ...ObjectUploadOption) UploadObjectParams {
out := UploadObjectParams{}
for _, opt := range opts {
opt.apply(&out)
}
return out
}

type ObjectAttributes struct {
// Size is the object size in bytes.
Size int64 `json:"size"`

// LastModified is the timestamp the object was last modified.
LastModified time.Time `json:"last_modified"`

// ObjectVersion represents an etag, generation or revision that can be used as a version in conditional updates, if supported.
Version *ObjectVersion `json:"version,omitempty"`
}

// ObjectVersionType is used to specify the type of object version used by the underlying provider.
type ObjectVersionType int

const (
// Generation the provider supports a monotonically increasing integer version.
Generation ObjectVersionType = iota
// ETag the provider supports a hash or checksum version.
ETag ObjectVersionType = iota
)

type ObjectVersion struct {
// Type is the type of object version supported by the provider.
Type ObjectVersionType
// Value is a string representation of the version data from the provider.
Value string
}

type IterObjectAttributes struct {
Expand Down Expand Up @@ -387,14 +499,14 @@ func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdi

// UploadFile uploads the file with the given name to the bucket.
// It is a caller responsibility to clean partial upload in case of failure.
func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst string) error {
func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst string, opts ...ObjectUploadOption) error {
r, err := os.Open(filepath.Clean(src))
if err != nil {
return errors.Wrapf(err, "open file %s", src)
}
defer logerrcapture.Do(logger, r.Close, "close file %s", src)

if err := bkt.Upload(ctx, dst, r); err != nil {
if err := bkt.Upload(ctx, dst, r, opts...); err != nil {
return errors.Wrapf(err, "upload file %s as %s", src, dst)
}
level.Debug(logger).Log("msg", "uploaded file", "from", src, "dst", dst, "bucket", bkt.Name())
Expand Down Expand Up @@ -681,6 +793,10 @@ func (b *metricBucket) SupportedIterOptions() []IterOptionType {
return b.bkt.SupportedIterOptions()
}

func (b *metricBucket) SupportedObjectUploadOptions() []ObjectUploadOptionType {
return b.bkt.SupportedObjectUploadOptions()
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = OpAttributes
b.metrics.ops.WithLabelValues(op).Inc()
Expand Down Expand Up @@ -821,6 +937,8 @@ func (b *metricBucket) IsAccessDeniedErr(err error) bool {
return b.bkt.IsAccessDeniedErr(err)
}

func (b *metricBucket) IsConditionNotMetErr(err error) bool { return b.bkt.IsConditionNotMetErr(err) }

func (b *metricBucket) Close() error {
return b.bkt.Close()
}
Expand Down
Loading
Loading