Skip to content

Implement backup purge command to purge orphaned snapshots #3520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
2 changes: 2 additions & 0 deletions pkg/cmd/sctool/sctool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/command/backup"
"github.com/scylladb/scylla-manager/v3/pkg/command/backup/backupdelete"
"github.com/scylladb/scylla-manager/v3/pkg/command/backup/backuplist"
"github.com/scylladb/scylla-manager/v3/pkg/command/backup/backuppurge"
"github.com/scylladb/scylla-manager/v3/pkg/command/backup/backupvalidate"
"github.com/scylladb/scylla-manager/v3/pkg/command/cluster/clusteradd"
"github.com/scylladb/scylla-manager/v3/pkg/command/cluster/clusterdelete"
Expand Down Expand Up @@ -56,6 +57,7 @@ func buildCommand() *cobra.Command {

backupCmd := backup.NewCommand(&client)
backupCmd.AddCommand(
backuppurge.NewCommand(&client),
backupdelete.NewCommand(&client),
backupfiles.NewCommand(&client),
backuplist.NewCommand(&client),
Expand Down
88 changes: 88 additions & 0 deletions pkg/command/backup/backuppurge/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (C) 2017 ScyllaDB

package backuppurge

import (
_ "embed"
"fmt"
"io"
"time"

"github.com/spf13/cobra"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"

"github.com/scylladb/scylla-manager/v3/pkg/command/flag"
"github.com/scylladb/scylla-manager/v3/pkg/managerclient"
)

//go:embed res.yaml
var res []byte

type command struct {
cobra.Command
client *managerclient.Client
dryRun bool
cluster string
location []string
}

func NewCommand(client *managerclient.Client) *cobra.Command {
cmd := &command{
client: client,
}
if err := yaml.Unmarshal(res, &cmd.Command); err != nil {
panic(err)
}

defer flag.MustSetUsages(&cmd.Command, res, "cluster")
cmd.init()
cmd.RunE = func(_ *cobra.Command, args []string) error {
return cmd.run()
}
return &cmd.Command
}

func (cmd *command) init() {
w := flag.Wrap(cmd.Flags())
w.Cluster(&cmd.cluster)
w.Location(&cmd.location)
w.DryRun(&cmd.dryRun)
}

func (cmd *command) run() error {
stillWaiting := atomic.NewBool(true)
time.AfterFunc(5*time.Second, func() {
if stillWaiting.Load() {
fmt.Fprintf(cmd.OutOrStderr(), "NOTICE: this may take a while, we are reading metadata from backup location(s)\n")
}
})
var warnings Warnings
resp, warnings, err := cmd.client.PurgeBackups(cmd.Context(), cmd.cluster, cmd.location, cmd.dryRun)
if err != nil {
return err
}
err = resp.Render(cmd.OutOrStdout())
if err != nil {
return err
}
return warnings.Render(cmd.OutOrStdout())
}

// Warnings represent warnings.
type Warnings []string

// Render render Warnings to io.Writer.
func (w Warnings) Render(writer io.Writer) error {
if len(w) == 0 {
_, err := fmt.Fprintln(writer, "no warnings")
return err
}
for idx := range w {
_, err := fmt.Fprintf(writer, "warning#%d:%s\n", idx, w[idx])
if err != nil {
return err
}
}
return nil
}
15 changes: 15 additions & 0 deletions pkg/command/backup/backuppurge/res.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use: purge --cluster <id|name> [--location [<dc>:]<provider>:<bucket>] [--dry-run] [flags]

short: Purge backup files in remote locations

long: |
Purge stale backup data and meta files in remote locations for provided cluster.
That is data and files of:
- temporary manifests,
- manifests over task days retention days policy,
- manifests over task retention policy,
- manifests of deleted tasks.
It`s return information from manifests of deleted files.

location: |
If no location provided - locations gets from all tasks
4 changes: 4 additions & 0 deletions pkg/command/flag/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (w Wrapper) Location(p *[]string) {
w.fs.StringSliceVarP(p, "location", "L", nil, usage["location"])
}

func (w Wrapper) DryRun(p *bool) {
w.fs.BoolVarP(p, "dry-run", "d", false, usage["dry-run"])
}

//
// Task schedule flags
//
Expand Down
3 changes: 3 additions & 0 deletions pkg/command/flag/usage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ dc: |
fail-fast: |
Stops the task run on the first error.

dry-run: |
In dry-run mode no any changes will be done, result be returned like with changes.

keyspace: |
A list of `glob` patterns separated by a comma used to include or exclude tables.
The patterns match keyspaces and tables, separate the keyspace name from the table name with a dot e.g. ``keyspace,!keyspace.table_prefix_*``.
Expand Down
24 changes: 24 additions & 0 deletions pkg/managerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,30 @@ func (c *Client) ListBackups(ctx context.Context, clusterID string,
return BackupListItems{items: resp.Payload}, nil
}

// PurgeBackups purge stale snapshots for clusterID and locations.
//
// - locations, list of locations, if empty it is populated from existing,deleted and disabled tasks.
// - dryRun, if true nothing is deleted.
func (c *Client) PurgeBackups(ctx context.Context, clusterID string, locations []string, dryRun bool) (BackupListItems, []string, error) {
p := &operations.DeleteClusterClusterIDBackupsPurgeParams{
Context: ctx,
ClusterID: clusterID,
Locations: locations,
DryRun: &dryRun,
}

resp, err := c.operations.DeleteClusterClusterIDBackupsPurge(p)
if err != nil {
return BackupListItems{}, nil, err
}
out := BackupListItems{
items: resp.Payload.Deleted,
AllClusters: false,
ShowTables: 0,
}
return out, resp.Payload.Warnings, nil
}

// ListBackupFiles returns a listing of available backup files.
func (c *Client) ListBackupFiles(ctx context.Context, clusterID string,
locations []string, allClusters bool, keyspace []string, snapshotTag string,
Expand Down
118 changes: 112 additions & 6 deletions pkg/restapi/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ package restapi
import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/pkg/errors"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/scheduler"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

type backupHandler struct {
Expand All @@ -27,21 +30,22 @@ func newBackupHandler(services Services) *chi.Mux {
schedSvc: services.Scheduler,
}

m.Use(
rt := m.With(
h.locationsCtx,
h.listFilterCtx,
)
m.Get("/", h.list)
m.Delete("/", h.deleteSnapshot)
m.Get("/files", h.listFiles)
rt.Get("/", h.list)
rt.Delete("/", h.deleteSnapshot)
rt.Get("/files", h.listFiles)
m.With(h.orphanedLocationsCtx).Delete("/purge", h.purge)

return m
}

func (h backupHandler) locationsCtx(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
locations []backupspec.Location
locations backupspec.Locations
err error
)

Expand Down Expand Up @@ -92,8 +96,66 @@ func (h backupHandler) extractLocations(r *http.Request) ([]backupspec.Location,
return h.svc.ExtractLocations(r.Context(), properties), nil
}

func (h backupHandler) orphanedLocationsCtx(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var locations backupspec.Locations

// Read locations from the request
if v := r.FormValue("locations"); v != "" {
for _, v := range r.Form["locations"] {
var l backupspec.Location
if err := l.UnmarshalText([]byte(v)); err != nil {
respondBadRequest(w, r, err)
return
}
locations = append(locations, l)
}
}

// Fallback read locations from scheduler
if len(locations) == 0 {
tasksProperties, err := h.getTasksProperties(r.Context(), mustClusterIDFromCtx(r), true, true)
if err != nil {
respondError(w, r, err)
return
}
locations = tasksProperties.GetLocations()
}

// Report error if no locations can be found
if len(locations) == 0 {
respondBadRequest(w, r, errors.New("missing locations"))
return
}

ctx := r.Context()
ctx = context.WithValue(ctx, ctxBackupLocations, locations)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

func (h backupHandler) getTasksProperties(ctx context.Context, clusterID uuid.UUID, deleted, disabled bool) (backup.TaskPropertiesByUUID, error) {
filter := scheduler.ListFilter{
TaskType: []scheduler.TaskType{scheduler.BackupTask},
Deleted: deleted,
Disabled: disabled,
}
tasksItems, err := h.schedSvc.ListTasks(ctx, clusterID, filter)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
tasksProperties, err := backup.GetTasksProperties(tasksItems)
if err != nil {
return nil, err
}
return tasksProperties, nil
}

func (h backupHandler) mustLocationsFromCtx(r *http.Request) []backupspec.Location {
v, ok := r.Context().Value(ctxBackupLocations).([]backupspec.Location)
v, ok := r.Context().Value(ctxBackupLocations).(backupspec.Locations)
if !ok {
panic("missing locations in context")
}
Expand Down Expand Up @@ -194,3 +256,47 @@ func (h backupHandler) deleteSnapshot(w http.ResponseWriter, r *http.Request) {

w.WriteHeader(http.StatusOK)
}

func (h backupHandler) purge(w http.ResponseWriter, r *http.Request) {
tasksProp, err := h.getTasksProperties(r.Context(), mustClusterIDFromCtx(r), false, false)
if err != nil {
respondError(w, r, err)
return
}

dryRun := false
if v := r.FormValue("dry_run"); v == "true" {
dryRun = true
}

deleted, warnings, err := h.svc.PurgeBackups(
r.Context(),
mustClusterIDFromCtx(r),
h.mustLocationsFromCtx(r),
tasksProp.GetRetentionMap(),
dryRun,
)
if err != nil {
fmt.Println("")
respondError(w, r, errors.Wrap(err, "manual purge snapshots"))
return
}
render.Respond(w, r, BackupPurgeOut{Deleted: ConvertManifestsToListItems(deleted), Warnings: warnings})
}

// ConvertManifestsToListItems converts Manifests to ListItems.
func ConvertManifestsToListItems(deleted backupspec.Manifests) backup.ListItems {
out := &backup.ListItems{}
for _, manifest := range deleted {
item := out.GetOrAppend(manifest.ClusterID, manifest.TaskID)
sInfo := item.SnapshotInfo.GetOrAppend(manifest.SnapshotTag)
sInfo.Nodes++
}
return *out
}

// BackupPurgeOut represent response information backup purge.
type BackupPurgeOut struct {
Deleted backup.ListItems `json:"deleted"`
Warnings []string `json:"warnings"`
}
Loading