From ee80a84c481670c14018ae67c89efe935e312f41 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 11:37:21 -0400 Subject: [PATCH 1/9] feat(backupspec): add custom type 'Locations' --- pkg/service/backup/backupspec/location.go | 39 +++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/pkg/service/backup/backupspec/location.go b/pkg/service/backup/backupspec/location.go index eab03ed3cf..a98ec48de2 100644 --- a/pkg/service/backup/backupspec/location.go +++ b/pkg/service/backup/backupspec/location.go @@ -151,3 +151,42 @@ func (l Location) RemotePath(p string) string { } return path.Join(r+l.Path, p) } + +// Locations is a slice of Location. +type Locations []Location + +// Len return slice len. +func (l Locations) Len() int { + return len(l) +} + +// Contains return true if Location is in array. +func (l Locations) Contains(provider Provider, path string) bool { + for _, loc := range l { + if loc.Path == path && loc.Provider == provider { + return true + } + } + return false +} + +// Unique return new Locations array without same locations. +func (l Locations) Unique() Locations { + out := make(Locations, 0, l.Len()) + for _, loc := range l { + if !out.Contains(loc.Provider, loc.Path) { + out = append(out, loc) + } + } + return out +} + +// HaveLocationWithDC return true if slice contains location with non-empty DC. +func (l Locations) HaveLocationWithDC() bool { + for _, loc := range l { + if loc.DC != "" { + return true + } + } + return false +} From 0053ba27545da5f4c92736aafd528413e20b427e Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 11:55:06 -0400 Subject: [PATCH 2/9] feat(backup): add custom type ListItems and some metods --- pkg/service/backup/model.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index 4f23b47c0c..d1e872a2b1 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -38,6 +38,19 @@ type ListItem struct { unitCache map[string]*strset.Set `json:"-"` } +// ListItems is a slice of ListItem. +type ListItems []*ListItem + +// GetIdx return index of item with specified clusterID and taskID, if missing - return '-1'. +func (l ListItems) GetIdx(clusterID, taskID uuid.UUID) int { + for idx := range l { + if l[idx].ClusterID == clusterID && l[idx].TaskID == taskID { + return idx + } + } + return -1 +} + // Target specifies what should be backed up and where. type Target struct { Units []Unit `json:"units,omitempty"` From f3b15a7bf673128c28226f02c3e861b1e06fafe4 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 12:10:09 -0400 Subject: [PATCH 3/9] feat(backup): add auxiliary methods to 'RetentionMap' type --- pkg/service/backup/model.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index d1e872a2b1..2cd6a5cae3 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -333,6 +333,40 @@ func defaultRetentionForDeletedTask() RetentionPolicy { // RetentionMap is a mapping of TaskIDs to retention policies. type RetentionMap map[uuid.UUID]RetentionPolicy +// Contains return true if specified taskID is in map. +func (r RetentionMap) Contains(taskID uuid.UUID) bool { + _, ok := r[taskID] + return ok +} + +// Add RetentionPolicy and return true if specified taskID is not in map. +func (r RetentionMap) Add(taskID uuid.UUID, retention, days int) bool { + if r.Contains(taskID) { + return false + } + + r[taskID] = RetentionPolicy{ + Retention: retention, + RetentionDays: days, + } + + return true +} + +// GetPolicy return retention policy for a given task ID. If missing - return default policy. +func (r RetentionMap) GetPolicy(taskID uuid.UUID) RetentionPolicy { + if policy, ok := r[taskID]; ok { + return policy + } + return defaultRetentionForDeletedTask() +} + +// PolicyExists return if policy exists for such taskID. +func (r RetentionMap) PolicyExists(taskID uuid.UUID) bool { + _, ok := r[taskID] + return ok +} + // ExtractRetention parses properties as task properties and returns "retention". func ExtractRetention(properties json.RawMessage) (RetentionPolicy, error) { var p taskProperties From b6009cecd31524720efd241ef5a32a6e1f5457ef Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 11 Aug 2023 13:12:32 -0400 Subject: [PATCH 4/9] add(slice) added function to get slice with unique values --- pkg/util/slice/contains.go | 4 +++- pkg/util/slice/contains_test.go | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/util/slice/contains.go b/pkg/util/slice/contains.go index b93c6c8ce0..9b4c92aabf 100644 --- a/pkg/util/slice/contains.go +++ b/pkg/util/slice/contains.go @@ -2,7 +2,9 @@ package slice -import "fmt" +import ( + "fmt" +) // Contains is a general purpose function to check if a slice contains element. // It has a linear complexity, and does not assume any structure of data. diff --git a/pkg/util/slice/contains_test.go b/pkg/util/slice/contains_test.go index 5a3b8604f0..69d7321441 100644 --- a/pkg/util/slice/contains_test.go +++ b/pkg/util/slice/contains_test.go @@ -2,7 +2,9 @@ package slice -import "testing" +import ( + "testing" +) func TestContains(t *testing.T) { t.Parallel() @@ -23,4 +25,5 @@ func TestContains(t *testing.T) { check(nil, "a", false) }) + } From 4730e6ca9c9d78b4f99d904db285a2329b29e1b0 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 14:04:57 -0400 Subject: [PATCH 5/9] feat(backupspec): add custom type" 'Manifests' and auxiliary methods --- pkg/service/backup/backupspec/manifest.go | 39 ++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/pkg/service/backup/backupspec/manifest.go b/pkg/service/backup/backupspec/manifest.go index deb8cdc923..61db9834d2 100644 --- a/pkg/service/backup/backupspec/manifest.go +++ b/pkg/service/backup/backupspec/manifest.go @@ -5,6 +5,7 @@ package backupspec import ( "compress/gzip" "encoding/json" + "fmt" "io" "os" "path" @@ -13,10 +14,12 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" + "go.uber.org/multierr" + "github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter" "github.com/scylladb/scylla-manager/v3/pkg/util/pathparser" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" - "go.uber.org/multierr" ) // ManifestInfo represents manifest on remote location. @@ -30,6 +33,40 @@ type ManifestInfo struct { Temporary bool } +// Manifests is a slice of ManifestInfo. +type Manifests []*ManifestInfo + +// Len returns slice len. +func (l Manifests) Len() int { + return len(l) +} + +// GetSnapshots returns slice of unique and not empty SnapshotTag. +func (l Manifests) GetSnapshots() []string { + out := strset.New() + for _, m := range l { + out.Add(m.SnapshotTag) + } + out.Remove("") + return out.List() +} + +// GroupByClusterNodeDC returns slice of Manifests grouped by ClusterID, NodeID, DC. +func (l Manifests) GroupByClusterNodeDC() []Manifests { + tmp := make(map[string]Manifests) + for _, manifest := range l { + group := fmt.Sprint(manifest.ClusterID.String(), manifest.NodeID, manifest.DC) + tmp[group] = append(tmp[group], manifest) + } + out := make([]Manifests, len(tmp)) + idx := 0 + for _, manifests := range tmp { + out[idx] = manifests + idx++ + } + return out +} + // Path returns path to the file that manifest points to. func (m *ManifestInfo) Path() string { f := RemoteManifestFile(m.ClusterID, m.TaskID, m.SnapshotTag, m.DC, m.NodeID) From 915f99b1fb6da4fcef6c539138f1f8440305ec18 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 17:32:19 -0400 Subject: [PATCH 6/9] add(backup): manual PurgeBackups possibility --- pkg/restapi/backup.go | 118 +++++++++++++- pkg/restapi/services.go | 1 + pkg/service/backup/list.go | 13 +- pkg/service/backup/model.go | 100 ++++++++++-- pkg/service/backup/model_test.go | 67 ++++++++ pkg/service/backup/purger.go | 54 ++++--- pkg/service/backup/purger_test.go | 51 ++++-- pkg/service/backup/service.go | 245 ++++++++++++++++++++++------- pkg/service/backup/worker_purge.go | 4 +- pkg/util/parallel/parallel.go | 25 +++ pkg/util/parallel/parallel_test.go | 38 +++++ pkg/util/timeutc/timeutc_test.go | 3 +- 12 files changed, 605 insertions(+), 114 deletions(-) diff --git a/pkg/restapi/backup.go b/pkg/restapi/backup.go index c7c93060ac..ef1a6aa8d1 100644 --- a/pkg/restapi/backup.go +++ b/pkg/restapi/backup.go @@ -5,14 +5,17 @@ package restapi import ( "context" "encoding/json" + "fmt" "net/http" "github.com/go-chi/chi/v5" "github.com/go-chi/render" "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/service/backup" "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/scheduler" + "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) type backupHandler struct { @@ -27,13 +30,14 @@ func newBackupHandler(services Services) *chi.Mux { schedSvc: services.Scheduler, } - m.Use( + rt := m.With( h.locationsCtx, h.listFilterCtx, ) - m.Get("/", h.list) - m.Delete("/", h.deleteSnapshot) - m.Get("/files", h.listFiles) + rt.Get("/", h.list) + rt.Delete("/", h.deleteSnapshot) + rt.Get("/files", h.listFiles) + m.With(h.orphanedLocationsCtx).Delete("/purge", h.purge) return m } @@ -41,7 +45,7 @@ func newBackupHandler(services Services) *chi.Mux { func (h backupHandler) locationsCtx(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var ( - locations []backupspec.Location + locations backupspec.Locations err error ) @@ -92,8 +96,66 @@ func (h backupHandler) extractLocations(r *http.Request) ([]backupspec.Location, return h.svc.ExtractLocations(r.Context(), properties), nil } +func (h backupHandler) orphanedLocationsCtx(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var locations backupspec.Locations + + // Read locations from the request + if v := r.FormValue("locations"); v != "" { + for _, v := range r.Form["locations"] { + var l backupspec.Location + if err := l.UnmarshalText([]byte(v)); err != nil { + respondBadRequest(w, r, err) + return + } + locations = append(locations, l) + } + } + + // Fallback read locations from scheduler + if len(locations) == 0 { + tasksProperties, err := h.getTasksProperties(r.Context(), mustClusterIDFromCtx(r), true, true) + if err != nil { + respondError(w, r, err) + return + } + locations = tasksProperties.GetLocations() + } + + // Report error if no locations can be found + if len(locations) == 0 { + respondBadRequest(w, r, errors.New("missing locations")) + return + } + + ctx := r.Context() + ctx = context.WithValue(ctx, ctxBackupLocations, locations) + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (h backupHandler) getTasksProperties(ctx context.Context, clusterID uuid.UUID, deleted, disabled bool) (backup.TaskPropertiesByUUID, error) { + filter := scheduler.ListFilter{ + TaskType: []scheduler.TaskType{scheduler.BackupTask}, + Deleted: deleted, + Disabled: disabled, + } + tasksItems, err := h.schedSvc.ListTasks(ctx, clusterID, filter) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + tasksProperties, err := backup.GetTasksProperties(tasksItems) + if err != nil { + return nil, err + } + return tasksProperties, nil +} + func (h backupHandler) mustLocationsFromCtx(r *http.Request) []backupspec.Location { - v, ok := r.Context().Value(ctxBackupLocations).([]backupspec.Location) + v, ok := r.Context().Value(ctxBackupLocations).(backupspec.Locations) if !ok { panic("missing locations in context") } @@ -194,3 +256,47 @@ func (h backupHandler) deleteSnapshot(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + +func (h backupHandler) purge(w http.ResponseWriter, r *http.Request) { + tasksProp, err := h.getTasksProperties(r.Context(), mustClusterIDFromCtx(r), false, false) + if err != nil { + respondError(w, r, err) + return + } + + dryRun := false + if v := r.FormValue("dry_run"); v == "true" { + dryRun = true + } + + deleted, warnings, err := h.svc.PurgeBackups( + r.Context(), + mustClusterIDFromCtx(r), + h.mustLocationsFromCtx(r), + tasksProp.GetRetentionMap(), + dryRun, + ) + if err != nil { + fmt.Println("") + respondError(w, r, errors.Wrap(err, "manual purge snapshots")) + return + } + render.Respond(w, r, BackupPurgeOut{Deleted: ConvertManifestsToListItems(deleted), Warnings: warnings}) +} + +// ConvertManifestsToListItems converts Manifests to ListItems. +func ConvertManifestsToListItems(deleted backupspec.Manifests) backup.ListItems { + out := &backup.ListItems{} + for _, manifest := range deleted { + item := out.GetOrAppend(manifest.ClusterID, manifest.TaskID) + sInfo := item.SnapshotInfo.GetOrAppend(manifest.SnapshotTag) + sInfo.Nodes++ + } + return *out +} + +// BackupPurgeOut represent response information backup purge. +type BackupPurgeOut struct { + Deleted backup.ListItems `json:"deleted"` + Warnings []string `json:"warnings"` +} diff --git a/pkg/restapi/services.go b/pkg/restapi/services.go index 31a6d131d3..6b39fb5685 100644 --- a/pkg/restapi/services.go +++ b/pkg/restapi/services.go @@ -65,6 +65,7 @@ type BackupService interface { DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, snapshotTags []string) error GetValidationTarget(_ context.Context, clusterID uuid.UUID, properties json.RawMessage) (backup.ValidationTarget, error) GetValidationProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) ([]backup.ValidationHostProgress, error) + PurgeBackups(ctx context.Context, clusterID uuid.UUID, locations backupspec.Locations, retentionMap backup.RetentionMap, dryRun bool) (backupspec.Manifests, []string, error) } // SchedService service interface for the REST API handlers. diff --git a/pkg/service/backup/list.go b/pkg/service/backup/list.go index b06885fbba..bc91abee4e 100644 --- a/pkg/service/backup/list.go +++ b/pkg/service/backup/list.go @@ -180,14 +180,23 @@ func filterManifests(manifests []*ManifestInfo, filter ListFilter) []*ManifestIn return out } -func groupManifestsByNode(manifests []*ManifestInfo) map[string][]*ManifestInfo { - v := map[string][]*ManifestInfo{} +func groupManifestsByNode(manifests []*ManifestInfo) map[string]Manifests { + v := map[string]Manifests{} for _, m := range manifests { v[m.NodeID] = append(v[m.NodeID], m) } return v } +func groupManifestsByHostIP(manifests []*ManifestInfo, locationHostIP map[Location]string) map[string]Manifests { + out := map[string]Manifests{} + for _, m := range manifests { + hostIP := locationHostIP[m.Location] + out[hostIP] = append(out[hostIP], m) + } + return out +} + func groupManifestsByTask(manifests []*ManifestInfo) map[uuid.UUID][]*ManifestInfo { v := map[uuid.UUID][]*ManifestInfo{} for _, m := range manifests { diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index 2cd6a5cae3..8984c6b2f5 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -4,6 +4,7 @@ package backup import ( "encoding/json" + stdErrors "errors" "fmt" "reflect" "regexp" @@ -18,6 +19,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/service/scheduler" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -28,12 +30,29 @@ type SnapshotInfo struct { Size int64 `json:"size"` } +// SnapshotInfoList slice of SnapshotInfo. +type SnapshotInfoList []SnapshotInfo + +// GetOrAppend returns pointer to the item that has provided snapshotTag, if it is not present adds it to the list. +func (l *SnapshotInfoList) GetOrAppend(snapshotTag string) *SnapshotInfo { + for id := range *l { + item := &(*l)[id] + if item.SnapshotTag == snapshotTag { + return item + } + } + *l = append(*l, SnapshotInfo{ + SnapshotTag: snapshotTag, + }) + return &(*l)[len(*l)-1] +} + // ListItem represents contents of a snapshot within list boundaries. type ListItem struct { - ClusterID uuid.UUID `json:"cluster_id"` - TaskID uuid.UUID `json:"task_id"` - Units []Unit `json:"units"` - SnapshotInfo []SnapshotInfo `json:"snapshot_info"` + ClusterID uuid.UUID `json:"cluster_id"` + TaskID uuid.UUID `json:"task_id"` + Units []Unit `json:"units"` + SnapshotInfo SnapshotInfoList `json:"snapshot_info"` unitCache map[string]*strset.Set `json:"-"` } @@ -41,14 +60,21 @@ type ListItem struct { // ListItems is a slice of ListItem. type ListItems []*ListItem -// GetIdx return index of item with specified clusterID and taskID, if missing - return '-1'. -func (l ListItems) GetIdx(clusterID, taskID uuid.UUID) int { - for idx := range l { - if l[idx].ClusterID == clusterID && l[idx].TaskID == taskID { - return idx +// GetOrAppend returns item that has provided clusterID and taskID, if it is not present adds it. +func (l *ListItems) GetOrAppend(clusterID, taskID uuid.UUID) *ListItem { + for _, item := range *l { + if item.ClusterID == clusterID && item.TaskID == taskID { + return item } } - return -1 + item := &ListItem{ + ClusterID: clusterID, + TaskID: taskID, + Units: nil, + SnapshotInfo: []SnapshotInfo{}, + } + *l = append(*l, item) + return item } // Target specifies what should be backed up and where. @@ -249,8 +275,8 @@ func dcLimitDCAtPos(s []DCLimit) func(int) (string, string) { } } -// taskProperties is the main data structure of the runner.Properties blob. -type taskProperties struct { +// TaskProperties is the main data structure of the runner.Properties blob. +type TaskProperties struct { Keyspace []string `json:"keyspace"` DC []string `json:"dc"` Location []Location `json:"location"` @@ -264,7 +290,7 @@ type taskProperties struct { PurgeOnly bool `json:"purge_only"` } -func (p taskProperties) extractRetention() RetentionPolicy { +func (p TaskProperties) extractRetention() RetentionPolicy { if p.Retention == nil && p.RetentionDays == nil { return defaultRetention() } @@ -279,8 +305,8 @@ func (p taskProperties) extractRetention() RetentionPolicy { return r } -func defaultTaskProperties() taskProperties { - return taskProperties{ +func defaultTaskProperties() TaskProperties { + return TaskProperties{ Continue: true, } } @@ -293,7 +319,7 @@ func extractLocations(properties []json.RawMessage) ([]Location, error) { ) for i := range properties { - var p taskProperties + var p TaskProperties if err := json.Unmarshal(properties[i], &p); err != nil { errs = multierr.Append(errs, errors.Wrapf(err, "parse %q", string(properties[i]))) continue @@ -310,6 +336,46 @@ func extractLocations(properties []json.RawMessage) ([]Location, error) { return locations, errs } +// GetTasksProperties extract TaskProperties from specified Tasks array. +func GetTasksProperties(tasks []*scheduler.TaskListItem) (TaskPropertiesByUUID, error) { + var errs []error + propertiesByTaskIds := make(TaskPropertiesByUUID) + for _, task := range tasks { + var properties TaskProperties + if err := json.Unmarshal(task.Properties, &properties); err != nil { + errs = append(errs, err) + continue + } + propertiesByTaskIds[task.ID] = &properties + } + return propertiesByTaskIds, stdErrors.Join(errs...) +} + +// TaskPropertiesByUUID represent map with uuid as key and reference on TaskProperties as value. +type TaskPropertiesByUUID map[uuid.UUID]*TaskProperties + +// GetLocations return all tasks locations exclude duplicates. +func (p TaskPropertiesByUUID) GetLocations() Locations { + locations := make(Locations, 0) + for _, taskProp := range p { + for _, loc := range taskProp.Location { + if !locations.Contains(loc.Provider, loc.Path) { + locations = append(locations, loc) + } + } + } + return locations +} + +// GetRetentionMap return combined RetentionMap for all tasks. +func (p TaskPropertiesByUUID) GetRetentionMap() RetentionMap { + retentionMap := make(RetentionMap) + for taskID, taskProp := range p { + retentionMap[taskID] = taskProp.RetentionMap[taskID] + } + return retentionMap +} + // RetentionPolicy defines the retention policy for a backup task. type RetentionPolicy struct { RetentionDays int `json:"retention_days"` @@ -369,7 +435,7 @@ func (r RetentionMap) PolicyExists(taskID uuid.UUID) bool { // ExtractRetention parses properties as task properties and returns "retention". func ExtractRetention(properties json.RawMessage) (RetentionPolicy, error) { - var p taskProperties + var p TaskProperties if err := json.Unmarshal(properties, &p); err != nil { return RetentionPolicy{}, err } diff --git a/pkg/service/backup/model_test.go b/pkg/service/backup/model_test.go index f95c7ef462..5f850837d8 100644 --- a/pkg/service/backup/model_test.go +++ b/pkg/service/backup/model_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/google/go-cmp/cmp" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) func TestDCLimitMarshalUnmarshalText(t *testing.T) { @@ -96,3 +98,68 @@ func TestExtractLocations(t *testing.T) { }) } } + +func TestListItems(t *testing.T) { + t.Parallel() + + t.Run("GetOrAppend", func(t *testing.T) { + t.Parallel() + var items [][2]uuid.UUID + l := ListItems{} + for x := 1; x < 10; x++ { + clusterID1, err := uuid.NewRandom() + if err != nil { + t.Fatal(err) + } + taskID1, err := uuid.NewRandom() + if err != nil { + t.Fatal(err) + } + items = append(items, [2]uuid.UUID{clusterID1, taskID1}) + l.GetOrAppend(clusterID1, taskID1) + if len(l) != x { + t.Fatalf("list len %d != %d", len(l), x) + } + } + + outer: + for _, item := range items { + for _, info := range l { + if info.ClusterID == item[0] && info.TaskID == item[1] { + continue outer + } + } + t.Fatalf("can't find record with clusterID=%s and taskID=%s in the list", item[0].String(), item[1].String()) + } + }) +} + +func TestSnapshotInfoList(t *testing.T) { + t.Parallel() + + t.Run("GetOrAppend", func(t *testing.T) { + t.Parallel() + l := SnapshotInfoList{} + var snapshots []string + for x := 1; x < 10; x++ { + snapshotInfo, err := uuid.NewRandom() + if err != nil { + t.Fatal(err) + } + snapshots = append(snapshots, snapshotInfo.String()) + l.GetOrAppend(snapshotInfo.String()) + if len(l) != x { + t.Fatalf("list len %d != %d", len(l), x) + } + } + outer: + for _, snapshot := range snapshots { + for _, info := range l { + if info.SnapshotTag == snapshot { + continue outer + } + } + t.Fatalf("can't find snapshot %s in the list", snapshot) + } + }) +} diff --git a/pkg/service/backup/purger.go b/pkg/service/backup/purger.go index 86fc2e90fc..9d5eb83fdc 100644 --- a/pkg/service/backup/purger.go +++ b/pkg/service/backup/purger.go @@ -4,6 +4,7 @@ package backup import ( "context" + "fmt" "net/http" "path" "sort" @@ -24,14 +25,16 @@ import ( // - temporary manifests, // - manifests over task days retention days policy, // - manifests over task retention policy, -// - manifests older than threshold if retention policy is unknown. +// - if cleanup - false - manifests older than threshold if retention policy is unknown. +// - if cleanup - true - manifests with unknown retention policy. // Moreover, it returns the oldest snapshot tag time that remains undeleted by retention policy. -func staleTags(manifests []*ManifestInfo, retentionMap RetentionMap) (*strset.Set, time.Time, error) { +func staleTags(manifests []*ManifestInfo, retentionMap RetentionMap, cleanup bool) (*strset.Set, time.Time, error) { tags := strset.New() var oldest time.Time for taskID, taskManifests := range groupManifestsByTask(manifests) { - taskPolicy := GetRetention(taskID, retentionMap) + taskPolicy := retentionMap.GetPolicy(taskID) + taskTags := strset.New() for _, m := range taskManifests { t, err := SnapshotTagTime(m.SnapshotTag) @@ -40,6 +43,8 @@ func staleTags(manifests []*ManifestInfo, retentionMap RetentionMap) (*strset.Se } switch { + case cleanup && !retentionMap.PolicyExists(taskID): + tags.Add(m.SnapshotTag) case m.Temporary: tags.Add(m.SnapshotTag) // Tasks can have a Retention policy and a RetentionDays policy so fall through if tag is not too old @@ -96,17 +101,19 @@ func newPurger(client *scyllaclient.Client, host string, logger log.Logger) purg // PurgeSnapshotTags removes files that are no longer needed as given snapshot tags are purged. // Oldest represents the time of the oldest backup that we intend to keep - it is used to purge versioned files. -func (p purger) PurgeSnapshotTags(ctx context.Context, manifests []*ManifestInfo, tags *strset.Set, oldest time.Time) (int, error) { +// If dryRun true - no any changes be, results be like usual run. +func (p purger) PurgeSnapshotTags(ctx context.Context, manifests Manifests, tags *strset.Set, oldest time.Time, dryRun bool) (Manifests, []string, error) { if len(manifests) == 0 { - return 0, nil + return nil, nil, nil } var ( // Used to obtain values which are common for all manifests in this function call // (e.g. location, clusterID, nodeID, ...) - anyM = manifests[0] - files = make(fileSet) - stale = 0 + anyM = manifests[0] + files = make(fileSet) + stale = 0 + deletedManifests = make(Manifests, 0, len(manifests)) ) for _, m := range manifests { @@ -118,17 +125,17 @@ func (p purger) PurgeSnapshotTags(ctx context.Context, manifests []*ManifestInfo "temporary", m.Temporary, ) if err := p.forEachDirInManifest(ctx, m, files.AddFiles); err != nil { - return 0, errors.Wrapf(err, "load manifest (snapshot) %s", m.Path()) + return nil, nil, errors.Wrapf(err, "load manifest (snapshot) %s", m.Path()) } } } if stale == 0 { - return 0, nil + return nil, nil, nil } for _, m := range manifests { if !tags.Has(m.SnapshotTag) { if err := p.forEachDirInManifest(ctx, m, files.RemoveFiles); err != nil { - return 0, errors.Wrapf(err, "load manifest (no snapshot) %s", m.Path()) + return nil, nil, errors.Wrapf(err, "load manifest (no snapshot) %s", m.Path()) } } } @@ -153,37 +160,48 @@ func (p purger) PurgeSnapshotTags(ctx context.Context, manifests []*ManifestInfo if ok { fileDir := path.Join(nodeDir, path.Dir(item.Path)) files.AddFiles(fileDir, []string{item.Name}) - p.logger.Info(ctx, "Found versioned SSTable to be removed", "dir", fileDir, "file", item.Name, ) } } + if err := p.client.RcloneListDirIter(ctx, p.host, anyM.Location.RemotePath(nodeDir), opts, cb); err != nil { - return 0, errors.Wrapf(err, "find versioned SSTables") + return nil, nil, errors.Wrapf(err, "find versioned SSTables") } } - if _, err := p.deleteFiles(ctx, anyM.Location, files); err != nil { - return 0, errors.Wrapf(err, "delete SSTables") + if dryRun { + for _, m := range manifests { + if tags.Has(m.SnapshotTag) { + deletedManifests = append(deletedManifests, m) + } + } + return deletedManifests, nil, nil } - deletedManifests := 0 + if _, err := p.deleteFiles(ctx, anyM.Location, files); err != nil { + p.logger.Error(ctx, "Failed to delete SSTables", "location", anyM.Location, "error", err) + return nil, nil, errors.Wrapf(err, "delete SSTables") + } + var warnings []string for _, m := range manifests { if tags.Has(m.SnapshotTag) { if _, err := p.deleteFile(ctx, m.Location.RemotePath(m.SchemaPath())); err != nil { p.logger.Info(ctx, "Failed to remove schema file", "path", m.SchemaPath(), "error", err) + warnings = append(warnings, fmt.Sprintf("failed to remove schema file ,path:%s, error:%s", m.SchemaPath(), err.Error())) } if _, err := p.deleteFile(ctx, m.Location.RemotePath(m.Path())); err != nil { p.logger.Info(ctx, "Failed to remove manifest", "path", m.Path(), "error", err) + warnings = append(warnings, fmt.Sprintf("failed to remove manifest , path:%s, error:%s", m.Path(), err.Error())) } else { - deletedManifests++ + deletedManifests = append(deletedManifests, m) } } } - return deletedManifests, nil + return deletedManifests, warnings, nil } // ValidationResult is a summary generated by Validate. diff --git a/pkg/service/backup/purger_test.go b/pkg/service/backup/purger_test.go index 0f73a412c9..3a8acc03d1 100644 --- a/pkg/service/backup/purger_test.go +++ b/pkg/service/backup/purger_test.go @@ -31,7 +31,7 @@ func TestStaleTags(t *testing.T) { task2 = uuid.MustRandom() task3 = uuid.MustRandom() task4 = uuid.MustRandom() - manifests []*ManifestInfo + manifests Manifests ) // Mixed snapshot tags across nodes manifests = append(manifests, gen("a", task0, 0, 7)...) @@ -41,10 +41,11 @@ func TestStaleTags(t *testing.T) { manifests = append(manifests, gen("b", task1, 10, 12)...) // Not found in policy delete older than 30 days manifests = append(manifests, gen("c", task2, 20, 22)...) + snapshotMinus15Days := SnapshotTagAt(timeutc.Now().AddDate(0, 0, -15)) manifests = append(manifests, &ManifestInfo{ NodeID: "c", TaskID: task2, - SnapshotTag: SnapshotTagAt(timeutc.Now().AddDate(0, 0, -15)), + SnapshotTag: snapshotMinus15Days, }) // Mixed policy 1 - retention days deletes 2, retention days deletes 1 manifests = append(manifests, gen("c", task3, 30, 32)...) @@ -71,12 +72,41 @@ func TestStaleTags(t *testing.T) { x.Temporary = true manifests = append(manifests, x) - tags, oldest, err := staleTags(manifests, RetentionMap{ - task0: {Retention: 3, RetentionDays: 0}, - task1: {Retention: 2, RetentionDays: 0}, - task3: {Retention: 2, RetentionDays: 10}, - task4: {Retention: 1, RetentionDays: 10}, - }) + retentionMap := make(RetentionMap) + retentionMap.Add(task0, 3, 0) + retentionMap.Add(task1, 2, 0) + retentionMap.Add(task3, 2, 10) + retentionMap.Add(task4, 1, 10) + + // Test without cleanup + tags, oldest, err := staleTags(manifests, retentionMap, false) + if err != nil { + t.Fatal(err) + } + if !oldest.Equal(time.Unix(4, 0)) { + t.Fatal("Validate the time of the oldest, remaining backup") + } + + expected := []string{ + "sm_19700101000000UTC", + "sm_19700101000001UTC", + "sm_19700101000002UTC", + "sm_19700101000003UTC", + "sm_19700101000006UTC", + "sm_19700101000020UTC", + "sm_19700101000021UTC", + "sm_19700101000030UTC", + "sm_19700101000031UTC", + "sm_19700101000040UTC", + deletedByRetentionTag, + } + + if diff := cmp.Diff(tags.List(), expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + t.Fatalf("staleTags() = %s, diff:\n%s", tags.List(), diff) + } + + // Test with cleanup + tags, oldest, err = staleTags(manifests, retentionMap, true) if err != nil { t.Fatal(err) } @@ -84,7 +114,7 @@ func TestStaleTags(t *testing.T) { t.Fatal("Validate the time of the oldest, remaining backup") } - golden := []string{ + expected = []string{ "sm_19700101000000UTC", "sm_19700101000001UTC", "sm_19700101000002UTC", @@ -95,10 +125,11 @@ func TestStaleTags(t *testing.T) { "sm_19700101000030UTC", "sm_19700101000031UTC", "sm_19700101000040UTC", + snapshotMinus15Days, deletedByRetentionTag, } - if diff := cmp.Diff(tags.List(), golden, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + if diff := cmp.Diff(tags.List(), expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { t.Fatalf("staleTags() = %s, diff:\n%s", tags.List(), diff) } } diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index f3b702ebb1..14de4863d3 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -13,6 +13,8 @@ import ( "time" "github.com/pkg/errors" + "go.uber.org/atomic" + "github.com/scylladb/go-log" "github.com/scylladb/go-set/strset" "github.com/scylladb/gocqlx/v2" @@ -31,7 +33,12 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" - "go.uber.org/atomic" +) + +var ( + errNoLocation = errors.New("no locations") + errNoStaleSnapshots = errors.New("no stale snapshots") + errorIPsNotResolved = service.ErrValidate(errors.New("can`t resolve host ip")) ) const defaultRateLimit = 100 // 100MiB @@ -105,15 +112,6 @@ func (s *Service) TaskDecorator(schedSvc *scheduler.Service) func(ctx context.Co } } -// GetRetention returns the retention policy for a given task ID. -// In case task ID cannot be found in the RetentionMap, default retention for deleted task is returned. -func GetRetention(taskID uuid.UUID, retentionMap RetentionMap) RetentionPolicy { - if r, ok := retentionMap[taskID]; ok { - return r - } - return defaultRetentionForDeletedTask() -} - // GetTarget converts runner properties into backup Target. // It also ensures configuration for the backup providers is registered on the // targeted hosts. @@ -475,24 +473,9 @@ func (s *Service) List(ctx context.Context, clusterID uuid.UUID, locations []Loc ptr = &items[len(items)-1] } - // Find snapshot info or create a new one - var siptr *SnapshotInfo - for i, r := range ptr.SnapshotInfo { - if r.SnapshotTag == mc.SnapshotTag { - siptr = &ptr.SnapshotInfo[i] - break - } - } - if siptr == nil { - ptr.SnapshotInfo = append(ptr.SnapshotInfo, SnapshotInfo{ - SnapshotTag: mc.SnapshotTag, - Nodes: 1, - Size: size, - }) - } else { - siptr.Nodes++ - siptr.Size += size - } + sInfo := ptr.SnapshotInfo.GetOrAppend(mc.SnapshotTag) + sInfo.Nodes++ + sInfo.Size += size // Add unit information from index return mc.ForEachIndexIter(filter.Keyspace, func(u FilesMeta) { @@ -582,16 +565,13 @@ func (s *Service) forEachManifest(ctx context.Context, clusterID uuid.UUID, loca } // Resolve hosts for locations - hosts := make([]hostInfo, len(locations)) - for i := range locations { - hosts[i].Location = locations[i] - } - if err := s.resolveHosts(ctx, client, hosts); err != nil { + hosts, err := s.resolveHosts(ctx, client, locations) + if err != nil { return errors.Wrap(err, "resolve hosts") } locationHost := map[Location]string{} - for _, h := range hosts { - locationHost[h.Location] = h.IP + for _, host := range hosts { + locationHost[host.Location] = host.IP } manifests, err := listManifestsInAllLocations(ctx, client, hosts, filter.ClusterID) @@ -627,27 +607,24 @@ func (s *Service) forEachManifest(ctx context.Context, clusterID uuid.UUID, loca return nil } -func (s *Service) resolveHosts(ctx context.Context, client *scyllaclient.Client, hosts []hostInfo) error { +func (s *Service) resolveHosts(ctx context.Context, client *scyllaclient.Client, locations Locations) ([]hostInfo, error) { s.logger.Debug(ctx, "Resolving hosts for locations") - + hosts := make([]hostInfo, len(locations)) + for i := range locations { + hosts[i].Location = locations[i] + } var ( dcMap map[string][]string err error ) // Check if we need to load DC map - hasDC := false - for i := range hosts { - if hosts[i].Location.DC != "" { - hasDC = true - break - } - } + hasDC := locations.HaveLocationWithDC() // Load DC map if needed if hasDC { dcMap, err = client.Datacenters(ctx) if err != nil { - return errors.Wrap(err, "read datacenters") + return nil, errors.Wrap(err, "read datacenters") } } @@ -663,7 +640,7 @@ func (s *Service) resolveHosts(ctx context.Context, client *scyllaclient.Client, } if len(checklist) == 0 { - return errors.Errorf("no matching hosts found for location %s", l) + return errors.Wrapf(errorIPsNotResolved, " for location %s", l) } for _, h := range checklist { @@ -678,7 +655,7 @@ func (s *Service) resolveHosts(ctx context.Context, client *scyllaclient.Client, } } - return errors.Errorf("no matching hosts found for location %s", l) + return errors.Wrapf(errorIPsNotResolved, " for location %s", l) } notify := func(i int, err error) { @@ -690,7 +667,7 @@ func (s *Service) resolveHosts(ctx context.Context, client *scyllaclient.Client, ) } - return parallel.Run(len(hosts), parallel.NoLimit, f, notify) + return hosts, parallel.Run(len(hosts), parallel.NoLimit, f, notify) } // Backup executes a backup on a given target. @@ -1126,11 +1103,8 @@ func (s *Service) DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locat } // Resolve hosts for locations - hosts := make([]hostInfo, len(locations)) - for i := range locations { - hosts[i].Location = locations[i] - } - if err := s.resolveHosts(ctx, client, hosts); err != nil { + hosts, err := s.resolveHosts(ctx, client, locations) + if err != nil { return errors.Wrap(err, "resolve hosts") } @@ -1161,9 +1135,8 @@ func (s *Service) DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locat } } - n, err := p.PurgeSnapshotTags(ctx, manifests, tagS, oldest) - deletedManifests.Add(int32(n)) - + deleted, _, err := p.PurgeSnapshotTags(ctx, manifests, tagS, oldest, false) + deletedManifests.Add(int32(deleted.Len())) if err == nil { s.logger.Info(ctx, "Done purging snapshot data on host", "host", h.IP) } @@ -1177,7 +1150,8 @@ func (s *Service) DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locat ) } - if err := hostsInParallel(hosts, parallel.NoLimit, f, notify); err != nil { + err = hostsInParallel(hosts, parallel.NoLimit, f, notify) + if err != nil { return err } @@ -1187,3 +1161,160 @@ func (s *Service) DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locat return nil } + +// PurgeBackups purge stale backup data and meta files for clusterID in the Locations. +// That is data and files of: +// +// *temporary manifests, +// *manifests over task days retention days policy, +// *manifests over task retention policy, +// *manifests of with unknown retention policy. +// +// It returns deleted Manifests. if dryRun is true nothing is deleted. +func (s *Service) PurgeBackups( + ctx context.Context, + clusterID uuid.UUID, + locations Locations, + retentionMap RetentionMap, + dryRun bool, +) (deleted Manifests, + warnings []string, + err error, +) { + logger := s.logger.Named("PurgeBackups") + if dryRun { + logger = log.NopLogger + } + + logger.Debug(ctx, "Start", "cluster", clusterID, "locations", locations, "retentionMap", retentionMap, "dryRun", dryRun) + start := timeutc.Now() + defer func() { + if err != nil { + logger.Error(ctx, "Got an error", "error", err, "duration", timeutc.Since(start)) + } + logger.Debug(ctx, "Done", "deleted manifests", deleted, "duration", timeutc.Since(start)) + }() + + if locations.Len() == 0 { + return nil, []string{errNoLocation.Error()}, nil + } + locations = locations.Unique() + + deleted = make(Manifests, 0) + + // Get the cluster client + client, err := s.scyllaClient(ctx, clusterID) + if err != nil { + return nil, nil, errors.Wrap(err, "get scylla client") + } + + // Resolve hosts for locations + hosts, err := s.resolveHosts(ctx, client, locations) + if err != nil { + return nil, nil, errors.Wrap(err, "resolve hosts") + } + + locationHostIP := map[Location]string{} + for idx := range hosts { + locationHostIP[hosts[idx].Location] = hosts[idx].IP + } + + hostIPLocations := map[string]Locations{} + for idx := range hosts { + hostIPLocations[hosts[idx].IP] = append(hostIPLocations[hosts[idx].IP], hosts[idx].Location) + } + + // List manifests in all locations + manifests, err := listManifestsInAllLocations(ctx, client, hosts, clusterID) + if err != nil { + return nil, nil, errors.Wrap(err, "list manifests") + } + + // Get a list of stale tags + tags, oldest, err := staleTags(manifests, retentionMap, true) + if err != nil { + return nil, nil, errors.Wrap(err, "get stale snapshot tags") + } + + if tags.IsEmpty() { + return nil, []string{errNoStaleSnapshots.Error()}, nil + } + + snapshots := tags.Copy() + hostIPManifests := groupManifestsByHostIP(manifests, locationHostIP) + var mut sync.Mutex + + runFunc := func(hostIP string, m Manifests) error { + if m.Len() == 0 { + return nil + } + + manifestsList := m.GroupByClusterNodeDC() + for idx := range manifestsList { + manifests = manifestsList[idx] + logg := logger.With( + "host", hostIP, + "node", manifests[0].NodeID, + ) + + p := newPurger(client, hostIP, logg) + p.OnDelete = func(total, success int) { + if dryRun { + return + } + s.metrics.Backup.SetPurgeFiles(manifests[0].ClusterID, hostIP, total, success) + } + + logg.Info(ctx, "Purging stale snapshots of node from host") + + del, warn, err := p.PurgeSnapshotTags(ctx, manifests, tags, oldest, dryRun) + if err != nil { + mut.Lock() + warnings = append(warnings, err.Error()) + mut.Unlock() + } + if len(warn) != 0 { + mut.Lock() + warnings = append(warnings, warn...) + mut.Unlock() + } + + if del.Len() != 0 { + mut.Lock() + deleted = append(deleted, del...) + mut.Unlock() + } + + if dryRun { + continue + } + deletedSnapshots := del.GetSnapshots() + for dIdx := range deletedSnapshots { + snapshot := deletedSnapshots[dIdx] + if snapshots.Has(snapshot) { + if err = client.DeleteSnapshot(ctx, hostIP, snapshot); err != nil { + mut.Lock() + warnings = append(warnings, fmt.Sprintf("Failed to delete uploaded snapshot, host:%s, snapshot_tag:%s, error:%s", hostIP, snapshot, err.Error())) + mut.Unlock() + logg.Error(ctx, "Failed to delete uploaded snapshot", + "host", hostIP, + "snapshot_tag", snapshot, + "error", err, + ) + } else { + logg.Info(ctx, "Deleted uploaded snapshot", + "host", hostIP, + "snapshot_tag", snapshot, + ) + } + mut.Lock() + snapshots.Remove(snapshot) + mut.Unlock() + } + } + logg.Info(ctx, "Done purging stale snapshots of node from host") + } + return nil + } + return deleted, warnings, parallel.RunMap[string, Manifests](hostIPManifests, runFunc, nil, 10) +} diff --git a/pkg/service/backup/worker_purge.go b/pkg/service/backup/worker_purge.go index f09eddf6f4..838a49e815 100644 --- a/pkg/service/backup/worker_purge.go +++ b/pkg/service/backup/worker_purge.go @@ -29,7 +29,7 @@ func (w *worker) Purge(ctx context.Context, hosts []hostInfo, retentionMap Reten return errors.Wrap(err, "list manifests") } // Get a list of stale tags - tags, oldest, err := staleTags(manifests, retentionMap) + tags, oldest, err := staleTags(manifests, retentionMap, false) if err != nil { return errors.Wrap(err, "get stale snapshot tags") } @@ -81,7 +81,7 @@ func (w *worker) Purge(ctx context.Context, hosts []hostInfo, retentionMap Reten } p.logger = logger - if _, err := p.PurgeSnapshotTags(ctx, manifests, tags, oldest); err != nil { + if _, _, err := p.PurgeSnapshotTags(ctx, manifests, tags, oldest, false); err != nil { return err } diff --git a/pkg/util/parallel/parallel.go b/pkg/util/parallel/parallel.go index 2754865178..51f39b137b 100644 --- a/pkg/util/parallel/parallel.go +++ b/pkg/util/parallel/parallel.go @@ -93,3 +93,28 @@ func Run(n, limit int, f func(i int) error, notify func(i int, err error)) error } return retErr } + +type ( + // RunMapElem called on each elem of array in RunMap. + RunMapElem[key comparable, val any] func(key key, val val) error + // OnErrorMapFunc called on each error of RunMapElem in RunMap. + OnErrorMapFunc[key comparable, val any] func(key key, val val, err error) +) + +// RunMap executes function RunMapElem on each elem in map (list) in parallel, limit - count running goroutine at one time. +// OnErrorMapFunc is called when RunMapElem function encounters error. +func RunMap[key comparable, val any](list map[key]val, elemFunc RunMapElem[key, val], onErrorFunc OnErrorMapFunc[key, val], limit int) error { + keys := make([]key, len(list)) + idx := 0 + for lKey := range list { + keys[idx] = lKey + idx++ + } + elemF := func(i int) error { + return elemFunc(keys[i], list[keys[i]]) + } + onErrorF := func(i int, err error) { + onErrorFunc(keys[i], list[keys[i]], err) + } + return Run(len(list), limit, elemF, onErrorF) +} diff --git a/pkg/util/parallel/parallel_test.go b/pkg/util/parallel/parallel_test.go index 23c78127d0..aaf136fbe5 100644 --- a/pkg/util/parallel/parallel_test.go +++ b/pkg/util/parallel/parallel_test.go @@ -4,6 +4,9 @@ package parallel import ( "errors" + "fmt" + "reflect" + "sync" "testing" "time" @@ -120,3 +123,38 @@ func TestEmpty(t *testing.T) { t.Fatal("Run() error", err) } } + +func TestRunMap(t *testing.T) { + t.Parallel() + testMap1 := map[string]string{"1": "1", "2": "2", "3": "3", "4": "4", "5": "5"} + + testExp1 := map[string]string{"1": "11", "2": "22", "3": "33", "4": "44", "5": "55"} + + result := map[string]string{} + mut := sync.Mutex{} + onEachElem := func(key, val string) error { + mut.Lock() + result[key] = key + val + mut.Unlock() + return nil + } + + err := RunMap[string, string](testMap1, onEachElem, nil, 2) + if err != nil { + t.Fatalf("wrong work RunMap function, error:%s", err) + } + if !reflect.DeepEqual(testExp1, result) { + t.Fatal("wrong work RunMap function") + } + result = map[string]string{} + testMap2 := map[string]string{"1": "1"} + testExp2 := map[string]string{"1": "11"} + + err = RunMap[string, string](testMap2, onEachElem, nil, 1) + if err != nil { + t.Fatalf("wrong work RunMap function, error:%s", err) + } + if !reflect.DeepEqual(testExp2, result) { + t.Fatal(fmt.Sprintf("wrong work RunMap function, \nexpected:%v\nrecieved:%v\n", testExp2, result)) + } +} diff --git a/pkg/util/timeutc/timeutc_test.go b/pkg/util/timeutc/timeutc_test.go index 2679062476..df5b364197 100644 --- a/pkg/util/timeutc/timeutc_test.go +++ b/pkg/util/timeutc/timeutc_test.go @@ -4,11 +4,10 @@ package timeutc import ( "testing" - "time" ) func TestTodayMidnight(t *testing.T) { - l := TodayMidnight().In(time.Local) + l := TodayMidnight().Local() if l.Hour() != 0 { t.Error("invalid hour", l) } From dfd26149e6596516136149ba4bad76d3ebd8f095 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 15:49:15 -0400 Subject: [PATCH 7/9] add(scylla-manager API): add PurgeBackups support --- pkg/managerclient/client.go | 24 +++ ...ter_cluster_id_backups_purge_parameters.go | 184 ++++++++++++++++++ ...ster_cluster_id_backups_purge_responses.go | 119 +++++++++++ .../client/operations/operations_client.go | 35 ++++ .../scylla-manager/models/backup_purge_res.go | 83 ++++++++ swagger/scylla-manager.json | 56 ++++++ 6 files changed, 501 insertions(+) create mode 100644 swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_parameters.go create mode 100644 swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_responses.go create mode 100644 swagger/gen/scylla-manager/models/backup_purge_res.go diff --git a/pkg/managerclient/client.go b/pkg/managerclient/client.go index bbbfc96900..937b881fcb 100644 --- a/pkg/managerclient/client.go +++ b/pkg/managerclient/client.go @@ -570,6 +570,30 @@ func (c *Client) ListBackups(ctx context.Context, clusterID string, return BackupListItems{items: resp.Payload}, nil } +// PurgeBackups purge stale snapshots for clusterID and locations. +// +// - locations, list of locations, if empty it is populated from existing,deleted and disabled tasks. +// - dryRun, if true nothing is deleted. +func (c *Client) PurgeBackups(ctx context.Context, clusterID string, locations []string, dryRun bool) (BackupListItems, []string, error) { + p := &operations.DeleteClusterClusterIDBackupsPurgeParams{ + Context: ctx, + ClusterID: clusterID, + Locations: locations, + DryRun: &dryRun, + } + + resp, err := c.operations.DeleteClusterClusterIDBackupsPurge(p) + if err != nil { + return BackupListItems{}, nil, err + } + out := BackupListItems{ + items: resp.Payload.Deleted, + AllClusters: false, + ShowTables: 0, + } + return out, resp.Payload.Warnings, nil +} + // ListBackupFiles returns a listing of available backup files. func (c *Client) ListBackupFiles(ctx context.Context, clusterID string, locations []string, allClusters bool, keyspace []string, snapshotTag string, diff --git a/swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_parameters.go b/swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_parameters.go new file mode 100644 index 0000000000..c87e49e9a6 --- /dev/null +++ b/swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_parameters.go @@ -0,0 +1,184 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package operations + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + "net/http" + "time" + + "github.com/go-openapi/errors" + "github.com/go-openapi/runtime" + cr "github.com/go-openapi/runtime/client" + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" +) + +// NewDeleteClusterClusterIDBackupsPurgeParams creates a new DeleteClusterClusterIDBackupsPurgeParams object +// with the default values initialized. +func NewDeleteClusterClusterIDBackupsPurgeParams() *DeleteClusterClusterIDBackupsPurgeParams { + var () + return &DeleteClusterClusterIDBackupsPurgeParams{ + + timeout: cr.DefaultTimeout, + } +} + +// NewDeleteClusterClusterIDBackupsPurgeParamsWithTimeout creates a new DeleteClusterClusterIDBackupsPurgeParams object +// with the default values initialized, and the ability to set a timeout on a request +func NewDeleteClusterClusterIDBackupsPurgeParamsWithTimeout(timeout time.Duration) *DeleteClusterClusterIDBackupsPurgeParams { + var () + return &DeleteClusterClusterIDBackupsPurgeParams{ + + timeout: timeout, + } +} + +// NewDeleteClusterClusterIDBackupsPurgeParamsWithContext creates a new DeleteClusterClusterIDBackupsPurgeParams object +// with the default values initialized, and the ability to set a context for a request +func NewDeleteClusterClusterIDBackupsPurgeParamsWithContext(ctx context.Context) *DeleteClusterClusterIDBackupsPurgeParams { + var () + return &DeleteClusterClusterIDBackupsPurgeParams{ + + Context: ctx, + } +} + +// NewDeleteClusterClusterIDBackupsPurgeParamsWithHTTPClient creates a new DeleteClusterClusterIDBackupsPurgeParams object +// with the default values initialized, and the ability to set a custom HTTPClient for a request +func NewDeleteClusterClusterIDBackupsPurgeParamsWithHTTPClient(client *http.Client) *DeleteClusterClusterIDBackupsPurgeParams { + var () + return &DeleteClusterClusterIDBackupsPurgeParams{ + HTTPClient: client, + } +} + +/* +DeleteClusterClusterIDBackupsPurgeParams contains all the parameters to send to the API endpoint +for the delete cluster cluster ID backups purge operation typically these are written to a http.Request +*/ +type DeleteClusterClusterIDBackupsPurgeParams struct { + + /*ClusterID*/ + ClusterID string + /*DryRun*/ + DryRun *bool + /*Locations*/ + Locations []string + + timeout time.Duration + Context context.Context + HTTPClient *http.Client +} + +// WithTimeout adds the timeout to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) WithTimeout(timeout time.Duration) *DeleteClusterClusterIDBackupsPurgeParams { + o.SetTimeout(timeout) + return o +} + +// SetTimeout adds the timeout to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) SetTimeout(timeout time.Duration) { + o.timeout = timeout +} + +// WithContext adds the context to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) WithContext(ctx context.Context) *DeleteClusterClusterIDBackupsPurgeParams { + o.SetContext(ctx) + return o +} + +// SetContext adds the context to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) SetContext(ctx context.Context) { + o.Context = ctx +} + +// WithHTTPClient adds the HTTPClient to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) WithHTTPClient(client *http.Client) *DeleteClusterClusterIDBackupsPurgeParams { + o.SetHTTPClient(client) + return o +} + +// SetHTTPClient adds the HTTPClient to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) SetHTTPClient(client *http.Client) { + o.HTTPClient = client +} + +// WithClusterID adds the clusterID to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) WithClusterID(clusterID string) *DeleteClusterClusterIDBackupsPurgeParams { + o.SetClusterID(clusterID) + return o +} + +// SetClusterID adds the clusterId to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) SetClusterID(clusterID string) { + o.ClusterID = clusterID +} + +// WithDryRun adds the dryRun to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) WithDryRun(dryRun *bool) *DeleteClusterClusterIDBackupsPurgeParams { + o.SetDryRun(dryRun) + return o +} + +// SetDryRun adds the dryRun to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) SetDryRun(dryRun *bool) { + o.DryRun = dryRun +} + +// WithLocations adds the locations to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) WithLocations(locations []string) *DeleteClusterClusterIDBackupsPurgeParams { + o.SetLocations(locations) + return o +} + +// SetLocations adds the locations to the delete cluster cluster ID backups purge params +func (o *DeleteClusterClusterIDBackupsPurgeParams) SetLocations(locations []string) { + o.Locations = locations +} + +// WriteToRequest writes these params to a swagger request +func (o *DeleteClusterClusterIDBackupsPurgeParams) WriteToRequest(r runtime.ClientRequest, reg strfmt.Registry) error { + + if err := r.SetTimeout(o.timeout); err != nil { + return err + } + var res []error + + // path param cluster_id + if err := r.SetPathParam("cluster_id", o.ClusterID); err != nil { + return err + } + + if o.DryRun != nil { + + // query param dry_run + var qrDryRun bool + if o.DryRun != nil { + qrDryRun = *o.DryRun + } + qDryRun := swag.FormatBool(qrDryRun) + if qDryRun != "" { + if err := r.SetQueryParam("dry_run", qDryRun); err != nil { + return err + } + } + + } + + valuesLocations := o.Locations + + joinedLocations := swag.JoinByFormat(valuesLocations, "") + // query array param locations + if err := r.SetQueryParam("locations", joinedLocations...); err != nil { + return err + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} diff --git a/swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_responses.go b/swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_responses.go new file mode 100644 index 0000000000..4ae4bdc2f2 --- /dev/null +++ b/swagger/gen/scylla-manager/client/operations/delete_cluster_cluster_id_backups_purge_responses.go @@ -0,0 +1,119 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package operations + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "fmt" + "io" + + "github.com/go-openapi/runtime" + "github.com/go-openapi/strfmt" + + "github.com/scylladb/scylla-manager/v3/swagger/gen/scylla-manager/models" +) + +// DeleteClusterClusterIDBackupsPurgeReader is a Reader for the DeleteClusterClusterIDBackupsPurge structure. +type DeleteClusterClusterIDBackupsPurgeReader struct { + formats strfmt.Registry +} + +// ReadResponse reads a server response into the received o. +func (o *DeleteClusterClusterIDBackupsPurgeReader) ReadResponse(response runtime.ClientResponse, consumer runtime.Consumer) (interface{}, error) { + switch response.Code() { + case 200: + result := NewDeleteClusterClusterIDBackupsPurgeOK() + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + return result, nil + default: + result := NewDeleteClusterClusterIDBackupsPurgeDefault(response.Code()) + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + if response.Code()/100 == 2 { + return result, nil + } + return nil, result + } +} + +// NewDeleteClusterClusterIDBackupsPurgeOK creates a DeleteClusterClusterIDBackupsPurgeOK with default headers values +func NewDeleteClusterClusterIDBackupsPurgeOK() *DeleteClusterClusterIDBackupsPurgeOK { + return &DeleteClusterClusterIDBackupsPurgeOK{} +} + +/* +DeleteClusterClusterIDBackupsPurgeOK handles this case with default header values. + +Backup purge response +*/ +type DeleteClusterClusterIDBackupsPurgeOK struct { + Payload *models.BackupPurgeResp +} + +func (o *DeleteClusterClusterIDBackupsPurgeOK) Error() string { + return fmt.Sprintf("[DELETE /cluster/{cluster_id}/backups/purge][%d] deleteClusterClusterIdBackupsPurgeOK %+v", 200, o.Payload) +} + +func (o *DeleteClusterClusterIDBackupsPurgeOK) GetPayload() *models.BackupPurgeResp { + return o.Payload +} + +func (o *DeleteClusterClusterIDBackupsPurgeOK) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(models.BackupPurgeResp) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + return nil +} + +// NewDeleteClusterClusterIDBackupsPurgeDefault creates a DeleteClusterClusterIDBackupsPurgeDefault with default headers values +func NewDeleteClusterClusterIDBackupsPurgeDefault(code int) *DeleteClusterClusterIDBackupsPurgeDefault { + return &DeleteClusterClusterIDBackupsPurgeDefault{ + _statusCode: code, + } +} + +/* +DeleteClusterClusterIDBackupsPurgeDefault handles this case with default header values. + +Error +*/ +type DeleteClusterClusterIDBackupsPurgeDefault struct { + _statusCode int + + Payload *models.ErrorResponse +} + +// Code gets the status code for the delete cluster cluster ID backups purge default response +func (o *DeleteClusterClusterIDBackupsPurgeDefault) Code() int { + return o._statusCode +} + +func (o *DeleteClusterClusterIDBackupsPurgeDefault) Error() string { + return fmt.Sprintf("[DELETE /cluster/{cluster_id}/backups/purge][%d] DeleteClusterClusterIDBackupsPurge default %+v", o._statusCode, o.Payload) +} + +func (o *DeleteClusterClusterIDBackupsPurgeDefault) GetPayload() *models.ErrorResponse { + return o.Payload +} + +func (o *DeleteClusterClusterIDBackupsPurgeDefault) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(models.ErrorResponse) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + return nil +} diff --git a/swagger/gen/scylla-manager/client/operations/operations_client.go b/swagger/gen/scylla-manager/client/operations/operations_client.go index 3b1f7da660..ddc40e1184 100644 --- a/swagger/gen/scylla-manager/client/operations/operations_client.go +++ b/swagger/gen/scylla-manager/client/operations/operations_client.go @@ -29,6 +29,8 @@ type ClientService interface { DeleteClusterClusterIDBackups(params *DeleteClusterClusterIDBackupsParams) (*DeleteClusterClusterIDBackupsOK, error) + DeleteClusterClusterIDBackupsPurge(params *DeleteClusterClusterIDBackupsPurgeParams) (*DeleteClusterClusterIDBackupsPurgeOK, error) + DeleteClusterClusterIDTaskTaskTypeTaskID(params *DeleteClusterClusterIDTaskTaskTypeTaskIDParams) (*DeleteClusterClusterIDTaskTaskTypeTaskIDOK, error) GetClusterClusterID(params *GetClusterClusterIDParams) (*GetClusterClusterIDOK, error) @@ -152,6 +154,39 @@ func (a *Client) DeleteClusterClusterIDBackups(params *DeleteClusterClusterIDBac return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) } +/* +DeleteClusterClusterIDBackupsPurge delete cluster cluster ID backups purge API +*/ +func (a *Client) DeleteClusterClusterIDBackupsPurge(params *DeleteClusterClusterIDBackupsPurgeParams) (*DeleteClusterClusterIDBackupsPurgeOK, error) { + // TODO: Validate the params before sending + if params == nil { + params = NewDeleteClusterClusterIDBackupsPurgeParams() + } + + result, err := a.transport.Submit(&runtime.ClientOperation{ + ID: "DeleteClusterClusterIDBackupsPurge", + Method: "DELETE", + PathPattern: "/cluster/{cluster_id}/backups/purge", + ProducesMediaTypes: []string{"application/json"}, + ConsumesMediaTypes: []string{"application/json"}, + Schemes: []string{"http"}, + Params: params, + Reader: &DeleteClusterClusterIDBackupsPurgeReader{formats: a.formats}, + Context: params.Context, + Client: params.HTTPClient, + }) + if err != nil { + return nil, err + } + success, ok := result.(*DeleteClusterClusterIDBackupsPurgeOK) + if ok { + return success, nil + } + // unexpected success response + unexpectedSuccess := result.(*DeleteClusterClusterIDBackupsPurgeDefault) + return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) +} + /* DeleteClusterClusterIDTaskTaskTypeTaskID delete cluster cluster ID task task type task ID API */ diff --git a/swagger/gen/scylla-manager/models/backup_purge_res.go b/swagger/gen/scylla-manager/models/backup_purge_res.go new file mode 100644 index 0000000000..fc5fb8c2ef --- /dev/null +++ b/swagger/gen/scylla-manager/models/backup_purge_res.go @@ -0,0 +1,83 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "strconv" + + "github.com/go-openapi/errors" + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" +) + +// BackupPurgeResp backup purge resp +// +// swagger:model BackupPurgeResp +type BackupPurgeResp struct { + + // deleted + Deleted []*BackupListItem `json:"Deleted"` + + // warnings + Warnings []string `json:"Warnings"` +} + +// Validate validates this backup purge resp +func (m *BackupPurgeResp) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateDeleted(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *BackupPurgeResp) validateDeleted(formats strfmt.Registry) error { + + if swag.IsZero(m.Deleted) { // not required + return nil + } + + for i := 0; i < len(m.Deleted); i++ { + if swag.IsZero(m.Deleted[i]) { // not required + continue + } + + if m.Deleted[i] != nil { + if err := m.Deleted[i].Validate(formats); err != nil { + if ve, ok := err.(*errors.Validation); ok { + return ve.ValidateName("Deleted" + "." + strconv.Itoa(i)) + } + return err + } + } + + } + + return nil +} + +// MarshalBinary interface implementation +func (m *BackupPurgeResp) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *BackupPurgeResp) UnmarshalBinary(b []byte) error { + var res BackupPurgeResp + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/swagger/scylla-manager.json b/swagger/scylla-manager.json index a9e9fbe1c8..3272bb4e84 100644 --- a/swagger/scylla-manager.json +++ b/swagger/scylla-manager.json @@ -485,6 +485,23 @@ } } }, + "BackupPurgeResp": { + "type": "object", + "properties": { + "Deleted": { + "type": "array", + "items": { + "$ref": "#/definitions/BackupListItem" + } + }, + "Warnings": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, "BackupListItem": { "type": "object", "properties": { @@ -2008,6 +2025,45 @@ } } }, + "/cluster/{cluster_id}/backups/purge": { + "delete": { + "parameters": [ + { + "name": "cluster_id", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "dry_run", + "in": "query", + "type": "boolean" + }, + { + "name": "locations", + "in": "query", + "type": "array", + "items": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Backup purge response", + "schema": { + "$ref": "#/definitions/BackupPurgeResp" + } + }, + "default": { + "description": "Error", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + } + } + } + }, "/cluster/{cluster_id}/repairs/intensity": { "put": { "parameters": [ From dce61ad649473792790d29376117fdc008e3bc39 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 14:13:39 -0400 Subject: [PATCH 8/9] add(sctool CLI): add BackupPurge support --- pkg/cmd/sctool/sctool.go | 2 + pkg/command/backup/backuppurge/cmd.go | 88 +++++++++++++++++++++++++ pkg/command/backup/backuppurge/res.yaml | 15 +++++ pkg/command/flag/flag.go | 4 ++ pkg/command/flag/usage.yaml | 3 + 5 files changed, 112 insertions(+) create mode 100644 pkg/command/backup/backuppurge/cmd.go create mode 100644 pkg/command/backup/backuppurge/res.yaml diff --git a/pkg/cmd/sctool/sctool.go b/pkg/cmd/sctool/sctool.go index 11167dfe0f..129266683f 100644 --- a/pkg/cmd/sctool/sctool.go +++ b/pkg/cmd/sctool/sctool.go @@ -10,6 +10,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/command/backup" "github.com/scylladb/scylla-manager/v3/pkg/command/backup/backupdelete" "github.com/scylladb/scylla-manager/v3/pkg/command/backup/backuplist" + "github.com/scylladb/scylla-manager/v3/pkg/command/backup/backuppurge" "github.com/scylladb/scylla-manager/v3/pkg/command/backup/backupvalidate" "github.com/scylladb/scylla-manager/v3/pkg/command/cluster/clusteradd" "github.com/scylladb/scylla-manager/v3/pkg/command/cluster/clusterdelete" @@ -56,6 +57,7 @@ func buildCommand() *cobra.Command { backupCmd := backup.NewCommand(&client) backupCmd.AddCommand( + backuppurge.NewCommand(&client), backupdelete.NewCommand(&client), backupfiles.NewCommand(&client), backuplist.NewCommand(&client), diff --git a/pkg/command/backup/backuppurge/cmd.go b/pkg/command/backup/backuppurge/cmd.go new file mode 100644 index 0000000000..9386feaf5f --- /dev/null +++ b/pkg/command/backup/backuppurge/cmd.go @@ -0,0 +1,88 @@ +// Copyright (C) 2017 ScyllaDB + +package backuppurge + +import ( + _ "embed" + "fmt" + "io" + "time" + + "github.com/spf13/cobra" + "go.uber.org/atomic" + "gopkg.in/yaml.v2" + + "github.com/scylladb/scylla-manager/v3/pkg/command/flag" + "github.com/scylladb/scylla-manager/v3/pkg/managerclient" +) + +//go:embed res.yaml +var res []byte + +type command struct { + cobra.Command + client *managerclient.Client + dryRun bool + cluster string + location []string +} + +func NewCommand(client *managerclient.Client) *cobra.Command { + cmd := &command{ + client: client, + } + if err := yaml.Unmarshal(res, &cmd.Command); err != nil { + panic(err) + } + + defer flag.MustSetUsages(&cmd.Command, res, "cluster") + cmd.init() + cmd.RunE = func(_ *cobra.Command, args []string) error { + return cmd.run() + } + return &cmd.Command +} + +func (cmd *command) init() { + w := flag.Wrap(cmd.Flags()) + w.Cluster(&cmd.cluster) + w.Location(&cmd.location) + w.DryRun(&cmd.dryRun) +} + +func (cmd *command) run() error { + stillWaiting := atomic.NewBool(true) + time.AfterFunc(5*time.Second, func() { + if stillWaiting.Load() { + fmt.Fprintf(cmd.OutOrStderr(), "NOTICE: this may take a while, we are reading metadata from backup location(s)\n") + } + }) + var warnings Warnings + resp, warnings, err := cmd.client.PurgeBackups(cmd.Context(), cmd.cluster, cmd.location, cmd.dryRun) + if err != nil { + return err + } + err = resp.Render(cmd.OutOrStdout()) + if err != nil { + return err + } + return warnings.Render(cmd.OutOrStdout()) +} + +// Warnings represent warnings. +type Warnings []string + +// Render render Warnings to io.Writer. +func (w Warnings) Render(writer io.Writer) error { + if len(w) == 0 { + _, err := fmt.Fprintln(writer, "no warnings") + return err + } + for idx := range w { + _, err := fmt.Fprintf(writer, "warning#%d:%s\n", idx, w[idx]) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/command/backup/backuppurge/res.yaml b/pkg/command/backup/backuppurge/res.yaml new file mode 100644 index 0000000000..0eced7d93e --- /dev/null +++ b/pkg/command/backup/backuppurge/res.yaml @@ -0,0 +1,15 @@ +use: purge --cluster [--location [:]:] [--dry-run] [flags] + +short: Purge backup files in remote locations + +long: | + Purge stale backup data and meta files in remote locations for provided cluster. + That is data and files of: + - temporary manifests, + - manifests over task days retention days policy, + - manifests over task retention policy, + - manifests of deleted tasks. + It`s return information from manifests of deleted files. + +location: | + If no location provided - locations gets from all tasks \ No newline at end of file diff --git a/pkg/command/flag/flag.go b/pkg/command/flag/flag.go index feb60ba623..371259607d 100644 --- a/pkg/command/flag/flag.go +++ b/pkg/command/flag/flag.go @@ -115,6 +115,10 @@ func (w Wrapper) Location(p *[]string) { w.fs.StringSliceVarP(p, "location", "L", nil, usage["location"]) } +func (w Wrapper) DryRun(p *bool) { + w.fs.BoolVarP(p, "dry-run", "d", false, usage["dry-run"]) +} + // // Task schedule flags // diff --git a/pkg/command/flag/usage.yaml b/pkg/command/flag/usage.yaml index 4227102887..5b2bf5d1d8 100644 --- a/pkg/command/flag/usage.yaml +++ b/pkg/command/flag/usage.yaml @@ -19,6 +19,9 @@ dc: | fail-fast: | Stops the task run on the first error. +dry-run: | + In dry-run mode no any changes will be done, result be returned like with changes. + keyspace: | A list of `glob` patterns separated by a comma used to include or exclude tables. The patterns match keyspaces and tables, separate the keyspace name from the table name with a dot e.g. ``keyspace,!keyspace.table_prefix_*``. From 8b42f3e63dfd6804d9226452e22912c50a15b3b9 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Wed, 9 Aug 2023 14:24:39 -0400 Subject: [PATCH 9/9] add(restapi): add 'PurgeBackups' func test and some fixes --- pkg/restapi/backup_test.go | 123 ++++++++++++++++++++++++- pkg/restapi/main_test.go | 2 + pkg/restapi/mock_backupservice_test.go | 16 ++++ pkg/restapi/mock_schedservice_test.go | 94 ++++++++++--------- 4 files changed, 184 insertions(+), 51 deletions(-) diff --git a/pkg/restapi/backup_test.go b/pkg/restapi/backup_test.go index 544f3302e7..cfc8a0b99c 100644 --- a/pkg/restapi/backup_test.go +++ b/pkg/restapi/backup_test.go @@ -3,32 +3,40 @@ package restapi_test import ( + "encoding/json" "fmt" "net/http" "net/http/httptest" "net/url" "testing" + "time" "github.com/golang/mock/gomock" "github.com/scylladb/go-log" "github.com/scylladb/scylla-manager/v3/pkg/restapi" "github.com/scylladb/scylla-manager/v3/pkg/service/backup" "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/service/scheduler" "github.com/scylladb/scylla-manager/v3/pkg/util/timeutc" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) //go:generate mockgen -destination mock_backupservice_test.go -mock_names BackupService=MockBackupService -package restapi github.com/scylladb/scylla-manager/v3/pkg/restapi BackupService +//go:generate mockgen -destination mock_schedservice_test.go -mock_names SchedService=MockSchedService -package restapi github.com/scylladb/scylla-manager/v3/pkg/restapi SchedService func listBackupsRequest(clusterID uuid.UUID) *http.Request { return httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/v1/cluster/%s/backups", clusterID.String()), nil) } +func listBackupsPurgeRequest(clusterID uuid.UUID) *http.Request { + return httptest.NewRequest(http.MethodDelete, fmt.Sprintf("/api/v1/cluster/%s/backups/purge", clusterID.String()), nil) +} + func listBackupFilesRequest(clusterID uuid.UUID) *http.Request { return httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/v1/cluster/%s/backups/files", clusterID.String()), nil) } -func withForm(r *http.Request, locations []backupspec.Location, filter backup.ListFilter, query string) *http.Request { +func withForm(r *http.Request, dryRun bool, locations []backupspec.Location, filter backup.ListFilter, query string) *http.Request { r.Form = url.Values{} for _, l := range locations { r.Form.Add("locations", l.String()) @@ -41,6 +49,7 @@ func withForm(r *http.Request, locations []backupspec.Location, filter backup.Li r.Form.Add("min_date", string(a)) b, _ := filter.MaxDate.MarshalText() r.Form.Add("max_date", string(b)) + r.Form.Add("dry_run", fmt.Sprintf("%v", dryRun)) return r } @@ -91,15 +100,119 @@ func TestBackupList(t *testing.T) { cm.EXPECT().GetCluster(gomock.Any(), cluster.ID.String()).Return(cluster, nil) bm.EXPECT().List(gomock.Any(), cluster.ID, locations, filter).Return(golden, nil) - r := withForm(listBackupsRequest(cluster.ID), locations, filter, cluster.Name) + r := withForm(listBackupsRequest(cluster.ID), false, locations, filter, cluster.Name) w := httptest.NewRecorder() h.ServeHTTP(w, r) assertJsonBody(t, w, golden) } -func TestBackupListAllClusters(t *testing.T) { +func TestBackupPurge(t *testing.T) { t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + cm := restapi.NewMockClusterService(ctrl) + bm := restapi.NewMockBackupService(ctrl) + sm := restapi.NewMockSchedService(ctrl) + + services := restapi.Services{ + Cluster: cm, + Backup: bm, + Scheduler: sm, + } + + h := restapi.New(services, log.NewDevelopment()) + + var ( + cluster = givenCluster() + + locations = backupspec.Locations{ + {Provider: backupspec.S3, Path: "foo"}, + {Provider: backupspec.S3, Path: "bar"}, + } + + taskID1 = uuid.MustRandom() + taskID2 = uuid.MustRandom() + + snapshotNow1 = backupspec.SnapshotTagAt(time.Now()) + snapshotNow2 = backupspec.SnapshotTagAt(time.Now().Add(time.Minute)) + + filter = backup.ListFilter{ + ClusterID: cluster.ID, + } + + manifests = backupspec.Manifests{ + { + Location: locations[0], + DC: "", + ClusterID: cluster.ID, + NodeID: "", + TaskID: taskID1, + SnapshotTag: snapshotNow1, + Temporary: false, + }, + { + Location: locations[1], + DC: "", + ClusterID: cluster.ID, + NodeID: "", + TaskID: taskID2, + SnapshotTag: snapshotNow2, + Temporary: false, + }, + } + expected = restapi.BackupPurgeOut{ + Warnings: make([]string, 0), + } + ) + + task1 := makeTaskItem(cluster.ID, taskID1, locations, 3, 30) + task2 := makeTaskItem(cluster.ID, taskID2, locations, 3, 30) + tasks := []*scheduler.TaskListItem{ + task1, task2, + } + cm.EXPECT().GetCluster(gomock.Any(), cluster.ID.String()).Return(cluster, nil) + sm.EXPECT().ListTasks(gomock.Any(), cluster.ID, scheduler.ListFilter{TaskType: []scheduler.TaskType{scheduler.BackupTask}}).Return(tasks, nil) + bm.EXPECT().PurgeBackups(gomock.Any(), cluster.ID, locations, gomock.Any(), gomock.Any()).Return(manifests, expected.Warnings, nil) + + expected.Deleted = restapi.ConvertManifestsToListItems(manifests) + r := withForm(listBackupsPurgeRequest(cluster.ID), true, locations, filter, cluster.Name) + w := httptest.NewRecorder() + h.ServeHTTP(w, r) + + assertJsonBody(t, w, expected) +} + +func makeTaskItem(clusterID, taskID uuid.UUID, locations backupspec.Locations, retention, day int) *scheduler.TaskListItem { + taskProp := backup.TaskProperties{ + Location: locations, + RetentionMap: make(backup.RetentionMap, 0), + } + taskProp.RetentionMap.Add(taskID, retention, day) + + prop, err := json.Marshal(taskProp) + if err != nil { + return nil + } + + return &scheduler.TaskListItem{ + Task: scheduler.Task{ + ClusterID: clusterID, + ID: taskID, + Type: scheduler.BackupTask, + Enabled: true, + Sched: scheduler.Schedule{StartDate: time.Now()}, + Properties: prop, + }, + Suspended: false, + NextActivation: nil, + Retry: 0, + } + +} + +func TestBackupListAllClusters(t *testing.T) { + ctrl := gomock.NewController(t) defer ctrl.Finish() cm := restapi.NewMockClusterService(ctrl) @@ -142,7 +255,7 @@ func TestBackupListAllClusters(t *testing.T) { cm.EXPECT().GetCluster(gomock.Any(), cluster.ID.String()).Return(cluster, nil) bm.EXPECT().List(gomock.Any(), cluster.ID, locations, filter).Return(golden, nil) - r := withForm(listBackupsRequest(cluster.ID), locations, filter, "") + r := withForm(listBackupsRequest(cluster.ID), false, locations, filter, "") w := httptest.NewRecorder() h.ServeHTTP(w, r) assertJsonBody(t, w, golden) @@ -191,7 +304,7 @@ func TestBackupListFiles(t *testing.T) { cm.EXPECT().GetCluster(gomock.Any(), cluster.ID.String()).Return(cluster, nil) bm.EXPECT().ListFiles(gomock.Any(), cluster.ID, locations, filter).Return(golden, nil) - r := withForm(listBackupFilesRequest(cluster.ID), locations, filter, cluster.ID.String()) + r := withForm(listBackupFilesRequest(cluster.ID), false, locations, filter, cluster.ID.String()) w := httptest.NewRecorder() h.ServeHTTP(w, r) assertJsonBody(t, w, golden) diff --git a/pkg/restapi/main_test.go b/pkg/restapi/main_test.go index e7f915da83..dbe8be09a3 100644 --- a/pkg/restapi/main_test.go +++ b/pkg/restapi/main_test.go @@ -30,6 +30,8 @@ func jsonBody(t testing.TB, v interface{}) *bytes.Reader { } func assertJsonBody(t testing.TB, w *httptest.ResponseRecorder, expected interface{}) { + t.Helper() + b, err := json.Marshal(expected) if err != nil { t.Fatal(err) diff --git a/pkg/restapi/mock_backupservice_test.go b/pkg/restapi/mock_backupservice_test.go index 48ecf6f07b..c53201c988 100644 --- a/pkg/restapi/mock_backupservice_test.go +++ b/pkg/restapi/mock_backupservice_test.go @@ -230,3 +230,19 @@ func (mr *MockBackupServiceMockRecorder) ListFiles(arg0, arg1, arg2, arg3 interf mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListFiles", reflect.TypeOf((*MockBackupService)(nil).ListFiles), arg0, arg1, arg2, arg3) } + +// PurgeBackups mocks base method. +func (m *MockBackupService) PurgeBackups(arg0 context.Context, arg1 uuid.UUID, arg2 backupspec.Locations, arg3 backup.RetentionMap, arg4 bool) (backupspec.Manifests, []string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PurgeBackups", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(backupspec.Manifests) + ret1, _ := ret[1].([]string) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// PurgeBackups indicates an expected call of PurgeBackups. +func (mr *MockBackupServiceMockRecorder) PurgeBackups(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PurgeBackups", reflect.TypeOf((*MockBackupService)(nil).PurgeBackups), arg0, arg1, arg2, arg3, arg4) +} diff --git a/pkg/restapi/mock_schedservice_test.go b/pkg/restapi/mock_schedservice_test.go index aa8af3c836..63cdd41bd6 100644 --- a/pkg/restapi/mock_schedservice_test.go +++ b/pkg/restapi/mock_schedservice_test.go @@ -1,41 +1,42 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/scylladb/scylla-manager/pkg/restapi (interfaces: SchedService) +// Source: github.com/scylladb/scylla-manager/v3/pkg/restapi (interfaces: SchedService) // Package restapi is a generated GoMock package. package restapi import ( context "context" + reflect "reflect" + gomock "github.com/golang/mock/gomock" scheduler "github.com/scylladb/scylla-manager/v3/pkg/service/scheduler" uuid "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" - reflect "reflect" ) -// MockSchedService is a mock of SchedService interface +// MockSchedService is a mock of SchedService interface. type MockSchedService struct { ctrl *gomock.Controller recorder *MockSchedServiceMockRecorder } -// MockSchedServiceMockRecorder is the mock recorder for MockSchedService +// MockSchedServiceMockRecorder is the mock recorder for MockSchedService. type MockSchedServiceMockRecorder struct { mock *MockSchedService } -// NewMockSchedService creates a new mock instance +// NewMockSchedService creates a new mock instance. func NewMockSchedService(ctrl *gomock.Controller) *MockSchedService { mock := &MockSchedService{ctrl: ctrl} mock.recorder = &MockSchedServiceMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockSchedService) EXPECT() *MockSchedServiceMockRecorder { return m.recorder } -// DeleteTask mocks base method +// DeleteTask mocks base method. func (m *MockSchedService) DeleteTask(arg0 context.Context, arg1 *scheduler.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteTask", arg0, arg1) @@ -43,13 +44,13 @@ func (m *MockSchedService) DeleteTask(arg0 context.Context, arg1 *scheduler.Task return ret0 } -// DeleteTask indicates an expected call of DeleteTask +// DeleteTask indicates an expected call of DeleteTask. func (mr *MockSchedServiceMockRecorder) DeleteTask(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockSchedService)(nil).DeleteTask), arg0, arg1) } -// GetLastRuns mocks base method +// GetLastRuns mocks base method. func (m *MockSchedService) GetLastRuns(arg0 context.Context, arg1 *scheduler.Task, arg2 int) ([]*scheduler.Run, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetLastRuns", arg0, arg1, arg2) @@ -58,13 +59,28 @@ func (m *MockSchedService) GetLastRuns(arg0 context.Context, arg1 *scheduler.Tas return ret0, ret1 } -// GetLastRuns indicates an expected call of GetLastRuns +// GetLastRuns indicates an expected call of GetLastRuns. func (mr *MockSchedServiceMockRecorder) GetLastRuns(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastRuns", reflect.TypeOf((*MockSchedService)(nil).GetLastRuns), arg0, arg1, arg2) } -// GetRun mocks base method +// GetNthLastRun mocks base method. +func (m *MockSchedService) GetNthLastRun(arg0 context.Context, arg1 *scheduler.Task, arg2 int) (*scheduler.Run, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNthLastRun", arg0, arg1, arg2) + ret0, _ := ret[0].(*scheduler.Run) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetNthLastRun indicates an expected call of GetNthLastRun. +func (mr *MockSchedServiceMockRecorder) GetNthLastRun(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNthLastRun", reflect.TypeOf((*MockSchedService)(nil).GetNthLastRun), arg0, arg1, arg2) +} + +// GetRun mocks base method. func (m *MockSchedService) GetRun(arg0 context.Context, arg1 *scheduler.Task, arg2 uuid.UUID) (*scheduler.Run, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetRun", arg0, arg1, arg2) @@ -73,13 +89,13 @@ func (m *MockSchedService) GetRun(arg0 context.Context, arg1 *scheduler.Task, ar return ret0, ret1 } -// GetRun indicates an expected call of GetRun +// GetRun indicates an expected call of GetRun. func (mr *MockSchedServiceMockRecorder) GetRun(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRun", reflect.TypeOf((*MockSchedService)(nil).GetRun), arg0, arg1, arg2) } -// GetTaskByID mocks base method +// GetTaskByID mocks base method. func (m *MockSchedService) GetTaskByID(arg0 context.Context, arg1 uuid.UUID, arg2 scheduler.TaskType, arg3 uuid.UUID) (*scheduler.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetTaskByID", arg0, arg1, arg2, arg3) @@ -88,13 +104,13 @@ func (m *MockSchedService) GetTaskByID(arg0 context.Context, arg1 uuid.UUID, arg return ret0, ret1 } -// GetTaskByID indicates an expected call of GetTaskByID +// GetTaskByID indicates an expected call of GetTaskByID. func (mr *MockSchedServiceMockRecorder) GetTaskByID(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTaskByID", reflect.TypeOf((*MockSchedService)(nil).GetTaskByID), arg0, arg1, arg2, arg3) } -// IsSuspended mocks base method +// IsSuspended mocks base method. func (m *MockSchedService) IsSuspended(arg0 context.Context, arg1 uuid.UUID) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IsSuspended", arg0, arg1) @@ -102,13 +118,13 @@ func (m *MockSchedService) IsSuspended(arg0 context.Context, arg1 uuid.UUID) boo return ret0 } -// IsSuspended indicates an expected call of IsSuspended +// IsSuspended indicates an expected call of IsSuspended. func (mr *MockSchedServiceMockRecorder) IsSuspended(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSuspended", reflect.TypeOf((*MockSchedService)(nil).IsSuspended), arg0, arg1) } -// ListTasks mocks base method +// ListTasks mocks base method. func (m *MockSchedService) ListTasks(arg0 context.Context, arg1 uuid.UUID, arg2 scheduler.ListFilter) ([]*scheduler.TaskListItem, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListTasks", arg0, arg1, arg2) @@ -117,13 +133,13 @@ func (m *MockSchedService) ListTasks(arg0 context.Context, arg1 uuid.UUID, arg2 return ret0, ret1 } -// ListTasks indicates an expected call of ListTasks +// ListTasks indicates an expected call of ListTasks. func (mr *MockSchedServiceMockRecorder) ListTasks(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTasks", reflect.TypeOf((*MockSchedService)(nil).ListTasks), arg0, arg1, arg2) } -// PropertiesDecorator mocks base method +// PropertiesDecorator mocks base method. func (m *MockSchedService) PropertiesDecorator(arg0 scheduler.TaskType) scheduler.PropertiesDecorator { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PropertiesDecorator", arg0) @@ -131,13 +147,13 @@ func (m *MockSchedService) PropertiesDecorator(arg0 scheduler.TaskType) schedule return ret0 } -// PropertiesDecorator indicates an expected call of PropertiesDecorator +// PropertiesDecorator indicates an expected call of PropertiesDecorator. func (mr *MockSchedServiceMockRecorder) PropertiesDecorator(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PropertiesDecorator", reflect.TypeOf((*MockSchedService)(nil).PropertiesDecorator), arg0) } -// PutTask mocks base method +// PutTask mocks base method. func (m *MockSchedService) PutTask(arg0 context.Context, arg1 *scheduler.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutTask", arg0, arg1) @@ -145,27 +161,13 @@ func (m *MockSchedService) PutTask(arg0 context.Context, arg1 *scheduler.Task) e return ret0 } -// PutTask indicates an expected call of PutTask +// PutTask indicates an expected call of PutTask. func (mr *MockSchedServiceMockRecorder) PutTask(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutTask", reflect.TypeOf((*MockSchedService)(nil).PutTask), arg0, arg1) } -// PutTaskOnce mocks base method -func (m *MockSchedService) PutTaskOnce(arg0 context.Context, arg1 *scheduler.Task) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutTaskOnce", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// PutTaskOnce indicates an expected call of PutTaskOnce -func (mr *MockSchedServiceMockRecorder) PutTaskOnce(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutTaskOnce", reflect.TypeOf((*MockSchedService)(nil).PutTaskOnce), arg0, arg1) -} - -// Resume mocks base method +// Resume mocks base method. func (m *MockSchedService) Resume(arg0 context.Context, arg1 uuid.UUID, arg2 bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Resume", arg0, arg1, arg2) @@ -173,13 +175,13 @@ func (m *MockSchedService) Resume(arg0 context.Context, arg1 uuid.UUID, arg2 boo return ret0 } -// Resume indicates an expected call of Resume +// Resume indicates an expected call of Resume. func (mr *MockSchedServiceMockRecorder) Resume(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resume", reflect.TypeOf((*MockSchedService)(nil).Resume), arg0, arg1, arg2) } -// StartTask mocks base method +// StartTask mocks base method. func (m *MockSchedService) StartTask(arg0 context.Context, arg1 *scheduler.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartTask", arg0, arg1) @@ -187,13 +189,13 @@ func (m *MockSchedService) StartTask(arg0 context.Context, arg1 *scheduler.Task) return ret0 } -// StartTask indicates an expected call of StartTask +// StartTask indicates an expected call of StartTask. func (mr *MockSchedServiceMockRecorder) StartTask(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTask", reflect.TypeOf((*MockSchedService)(nil).StartTask), arg0, arg1) } -// StartTaskNoContinue mocks base method +// StartTaskNoContinue mocks base method. func (m *MockSchedService) StartTaskNoContinue(arg0 context.Context, arg1 *scheduler.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartTaskNoContinue", arg0, arg1) @@ -201,13 +203,13 @@ func (m *MockSchedService) StartTaskNoContinue(arg0 context.Context, arg1 *sched return ret0 } -// StartTaskNoContinue indicates an expected call of StartTaskNoContinue +// StartTaskNoContinue indicates an expected call of StartTaskNoContinue. func (mr *MockSchedServiceMockRecorder) StartTaskNoContinue(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTaskNoContinue", reflect.TypeOf((*MockSchedService)(nil).StartTaskNoContinue), arg0, arg1) } -// StopTask mocks base method +// StopTask mocks base method. func (m *MockSchedService) StopTask(arg0 context.Context, arg1 *scheduler.Task) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StopTask", arg0, arg1) @@ -215,13 +217,13 @@ func (m *MockSchedService) StopTask(arg0 context.Context, arg1 *scheduler.Task) return ret0 } -// StopTask indicates an expected call of StopTask +// StopTask indicates an expected call of StopTask. func (mr *MockSchedServiceMockRecorder) StopTask(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopTask", reflect.TypeOf((*MockSchedService)(nil).StopTask), arg0, arg1) } -// Suspend mocks base method +// Suspend mocks base method. func (m *MockSchedService) Suspend(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Suspend", arg0, arg1) @@ -229,7 +231,7 @@ func (m *MockSchedService) Suspend(arg0 context.Context, arg1 uuid.UUID) error { return ret0 } -// Suspend indicates an expected call of Suspend +// Suspend indicates an expected call of Suspend. func (mr *MockSchedServiceMockRecorder) Suspend(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Suspend", reflect.TypeOf((*MockSchedService)(nil).Suspend), arg0, arg1)