Skip to content

Add warning if temp location bucket has soft delete enabled for Go SD… #34996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
if *stagingLocation == "" {
return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
}

checkSoftDeletePolicyEnabled(ctx, *stagingLocation, "staging_location")

var jobLabels map[string]string
if *labels != "" {
if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil {
Expand Down Expand Up @@ -412,6 +415,8 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
}

checkSoftDeletePolicyEnabled(ctx, opts.TempLocation, "temp_location")

return opts, nil
}

Expand Down Expand Up @@ -456,3 +461,21 @@ func getContainerImage(ctx context.Context) string {
}
panic(fmt.Sprintf("Unsupported environment %v", urn))
}

func checkSoftDeletePolicyEnabled(ctx context.Context, bucketName string, locationName string) {
bucket, _, err := gcsx.ParseObject(bucketName)
if err != nil {
log.Warnf(ctx, "Error parsing bucket name: %v", err)
return
}
if enabled, err_msg := gcsx.SoftDeletePolicyEnabled(ctx, bucket); err_msg != nil {
log.Warnf(ctx, "Error checking SoftDeletePolicy: %v", err_msg)
} else if enabled {
log.Warnf(ctx, "Bucket %s specified in %s has soft-delete policy enabled. "+
"Dataflow jobs use Cloud Storage to store temporary files during pipeline execution. "+
"To avoid being billed for unnecessary storage costs, turn off the soft delete feature "+
"on buckets that your Dataflow jobs use for temporary storage. "+
"For more information, see https://cloud.google.com/storage/docs/use-soft-delete#remove-soft-delete-policy.",
bucketName, locationName)
}
}
18 changes: 18 additions & 0 deletions sdks/go/pkg/beam/util/gcsx/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ func Upload(ctx context.Context, client *storage.Client, project, bucket, object

}

var getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
defer client.Close()
return client.Bucket(bucketName).Attrs(ctx)
}

// SoftDeletePolicyEnabled returns true if SoftDeletePolicy is enabled on bucket
func SoftDeletePolicyEnabled(ctx context.Context, bucketName string) (bool, error) {
attrs, err := getBucketAttrs(ctx, bucketName)
if err != nil {
return false, err
}
return attrs.SoftDeletePolicy != nil && attrs.SoftDeletePolicy.RetentionDuration > 0, nil
}

// Get BucketAttrs with RetentionDuration of SoftDeletePolicy set to zero for disabling SoftDeletePolicy.
func getDisableSoftDeletePolicyBucketAttrs() *storage.BucketAttrs {
attrs := &storage.BucketAttrs{
Expand Down
46 changes: 46 additions & 0 deletions sdks/go/pkg/beam/util/gcsx/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package gcsx

import (
"context"
"strings"
"testing"

"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)

Expand Down Expand Up @@ -106,3 +108,47 @@ func TestGetDisableSoftDeletePolicyBucketAttrs(t *testing.T) {
t.Errorf("attrs has RetentionDuration %v which is not correct", attrs.SoftDeletePolicy.RetentionDuration)
}
}

func TestSoftDeletePolicyWhenEnabled(t *testing.T) {
// Save original and defer restore
original := getBucketAttrs
defer func() { getBucketAttrs = original }()

// Inject mock behavior
getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) {
return &storage.BucketAttrs{
SoftDeletePolicy: &storage.SoftDeletePolicy{
RetentionDuration: 1029,
},
}, nil
}

enabled, err := SoftDeletePolicyEnabled(context.Background(), "mock-bucket")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !enabled {
t.Errorf("Expected soft delete to be enabled, got false")
}
}

func TestSoftDeletePolicyWhenDisabled(t *testing.T) {
original := getBucketAttrs
defer func() { getBucketAttrs = original }()

getBucketAttrs = func(ctx context.Context, bucketName string) (*storage.BucketAttrs, error) {
return &storage.BucketAttrs{
SoftDeletePolicy: &storage.SoftDeletePolicy{
RetentionDuration: 0,
},
}, nil
}

enabled, err := SoftDeletePolicyEnabled(context.Background(), "mock-bucket")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if enabled {
t.Errorf("Expected soft delete to be disabled, got true")
}
}
Loading