diff --git a/internal/job/constants.go b/internal/job/constants.go index 5689163a47c..0c580f1b83e 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -35,6 +35,9 @@ const ( // DeleteTaskJob is the name of deleting task job. DeleteTaskJob = "delete_task" + + // GCJob is the name of gc job. + GCJob = "gc" ) // Machinery server configuration. diff --git a/manager/gc/audit.go b/manager/gc/audit.go index 5ef5a052e9f..a3ffa910713 100644 --- a/manager/gc/audit.go +++ b/manager/gc/audit.go @@ -65,10 +65,25 @@ func (a *audit) RunGC(ctx context.Context) error { return err } - if err := a.recorder.Init(AuditGCTaskID, models.JSONMap{ + args := models.JSONMap{ "ttl": ttl, "batch_size": DefaultAuditGCBatchSize, - }); err != nil { + } + + var userID uint + if id, ok := ctx.Value(pkggc.ContextKeyUserID).(uint); ok { + userID = id + } + + var taskID string + if id, ok := ctx.Value(pkggc.ContextKeyTaskID).(string); ok { + taskID = id + } else { + // Use the default task ID if taskID is not provided. (applied to background periodic execution scenarios) + taskID = AuditGCTaskID + } + + if err := a.recorder.Init(userID, taskID, args); err != nil { return err } diff --git a/manager/gc/job.go b/manager/gc/job.go index 93b3df984c0..6ed3cbd4bc8 100644 --- a/manager/gc/job.go +++ b/manager/gc/job.go @@ -64,10 +64,25 @@ func (j *job) RunGC(ctx context.Context) error { return err } - if err = j.recorder.Init(JobGCTaskID, models.JSONMap{ + args := models.JSONMap{ "ttl": ttl, "batch_size": DefaultJobGCBatchSize, - }); err != nil { + } + + var userID uint + if id, ok := ctx.Value(pkggc.ContextKeyUserID).(uint); ok { + userID = id + } + + var taskID string + if id, ok := ctx.Value(pkggc.ContextKeyTaskID).(string); ok { + taskID = id + } else { + // Use the default task ID if taskID is not provided. (applied to background periodic execution scenarios) + taskID = AuditGCTaskID + } + + if err = j.recorder.Init(userID, taskID, args); err != nil { return err } diff --git a/manager/gc/recorder.go b/manager/gc/recorder.go index 45c15f4216c..7eb66be7a73 100644 --- a/manager/gc/recorder.go +++ b/manager/gc/recorder.go @@ -51,10 +51,11 @@ func newJobRecorder(db *gorm.DB) *jobRecorder { } } -func (jb *jobRecorder) Init(taskID string, args models.JSONMap) error { +func (jb *jobRecorder) Init(userID uint, taskID string, args models.JSONMap) error { job := models.Job{ Type: GCJobType, TaskID: taskID, + UserID: userID, Args: args, } diff --git a/manager/handlers/job.go b/manager/handlers/job.go index 72b0706e0b0..545f7a9b23b 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -117,6 +117,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + ctx.JSON(http.StatusOK, job) + case job.GCJob: + var json types.CreateGCJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateGCJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + ctx.JSON(http.StatusOK, job) default: ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"}) diff --git a/manager/manager.go b/manager/manager.go index ddd2600e5f7..a4f911cf98f 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -34,7 +34,7 @@ import ( "d7y.io/dragonfly/v2/manager/cache" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/database" - "d7y.io/dragonfly/v2/manager/gc" + managergc "d7y.io/dragonfly/v2/manager/gc" "d7y.io/dragonfly/v2/manager/job" "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/permission/rbac" @@ -167,20 +167,21 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { } // Initialize garbage collector. - s.gc = pkggc.New() - + gc := pkggc.New() // Register job gc task. - if err := s.gc.Add(gc.NewJobGCTask(db.DB)); err != nil { + if err := gc.Add(managergc.NewJobGCTask(db.DB)); err != nil { return nil, err } // Register audit gc task. - if err := s.gc.Add(gc.NewAuditGCTask(db.DB)); err != nil { + if err := gc.Add(managergc.NewAuditGCTask(db.DB)); err != nil { return nil, err } + s.gc = gc + // Initialize REST server. - restService := service.New(cfg, db, cache, job, enforcer, objectStorage) + restService := service.New(cfg, db, cache, job, gc, enforcer, objectStorage) router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, s.jobRateLimiter, EmbedFolder(assets, assetsTargetPath)) if err != nil { return nil, err diff --git a/manager/service/job.go b/manager/service/job.go index e4ec0350275..3550bb90960 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -20,20 +20,80 @@ import ( "context" "errors" "fmt" + "time" machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks" + "github.com/google/uuid" + "gorm.io/gorm" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" + pkggc "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/retry" "d7y.io/dragonfly/v2/pkg/slices" "d7y.io/dragonfly/v2/pkg/structure" ) +const ( + // DefaultGCJobPollingTimeout is the default timeout for polling GC job. + DefaultGCJobPollingTimeout = 30 * time.Minute + + // DefaultGCJobPollingInterval is the default interval for polling GC job. + DefaultGCJobPollingInterval = 5 * time.Second +) + +func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest) (*models.Job, error) { + taskID := uuid.NewString() + ctx = context.WithValue(ctx, pkggc.ContextKeyTaskID, taskID) + ctx = context.WithValue(ctx, pkggc.ContextKeyUserID, json.UserID) + + // This is a non-block function to run the gc task, which will run the task asynchronously in the backend. + if err := s.gc.Run(ctx, json.Args.Type); err != nil { + return nil, err + } + + return s.pollingGCJob(ctx, json.Type, json.UserID, taskID) +} + +func (s *service) pollingGCJob(ctx context.Context, jobType string, userID uint, taskID string) (*models.Job, error) { + ctx, cancel := context.WithTimeout(ctx, DefaultGCJobPollingTimeout) + defer cancel() + + ticker := time.NewTicker(DefaultGCJobPollingInterval) + defer ticker.Stop() + + job := models.Job{} + + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context done: %w", ctx.Err()) + + case <-ticker.C: + if err := s.db.WithContext(ctx).First(&job, models.Job{ + Type: jobType, + UserID: userID, + TaskID: taskID, + }).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + continue + } + + return nil, err + } + + // Return the job if the job is in success or failure state, otherwise continue polling. + if job.State == machineryv1tasks.StateSuccess || job.State == machineryv1tasks.StateFailure { + return &job, nil + } + } + } +} + func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error { schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs) if err != nil { diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 656a860fc60..570ebcf2b2b 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -205,6 +205,21 @@ func (mr *MockServiceMockRecorder) CreateDeleteTaskJob(arg0, arg1 any) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTaskJob", reflect.TypeOf((*MockService)(nil).CreateDeleteTaskJob), arg0, arg1) } +// CreateGCJob mocks base method. +func (m *MockService) CreateGCJob(arg0 context.Context, arg1 types.CreateGCJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateGCJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateGCJob indicates an expected call of CreateGCJob. +func (mr *MockServiceMockRecorder) CreateGCJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGCJob", reflect.TypeOf((*MockService)(nil).CreateGCJob), arg0, arg1) +} + // CreateGetTaskJob mocks base method. func (m *MockService) CreateGetTaskJob(arg0 context.Context, arg1 types.CreateGetTaskJobRequest) (*models.Job, error) { m.ctrl.T.Helper() diff --git a/manager/service/service.go b/manager/service/service.go index 378c973898a..0813d458e15 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -33,6 +33,7 @@ import ( "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/permission/rbac" "d7y.io/dragonfly/v2/manager/types" + pkggc "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/objectstorage" ) @@ -118,6 +119,7 @@ type Service interface { CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error) CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error) + CreateGCJob(context.Context, types.CreateGCJobRequest) (*models.Job, error) DestroyJob(context.Context, uint) error UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error) GetJob(context.Context, uint) (*models.Job, error) @@ -152,18 +154,20 @@ type service struct { rdb redis.UniversalClient cache *cache.Cache job *job.Job + gc pkggc.GC enforcer *casbin.Enforcer objectStorage objectstorage.ObjectStorage } // NewREST returns a new REST instance -func New(cfg *config.Config, database *database.Database, cache *cache.Cache, job *job.Job, enforcer *casbin.Enforcer, objectStorage objectstorage.ObjectStorage) Service { +func New(cfg *config.Config, database *database.Database, cache *cache.Cache, job *job.Job, gc pkggc.GC, enforcer *casbin.Enforcer, objectStorage objectstorage.ObjectStorage) Service { return &service{ config: cfg, db: database.DB, rdb: database.RDB, cache: cache, job: job, + gc: gc, enforcer: enforcer, objectStorage: objectStorage, } diff --git a/manager/types/job.go b/manager/types/job.go index a350c9ea0da..dc13d2e010c 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -276,3 +276,22 @@ type DeleteTaskArgs struct { // Otherwise, calculate the task ID based on url, piece_length, tag, application, and filtered_query_params. ContentForCalculatingTaskID *string `json:"content_for_calculating_task_id" binding:"omitempty"` } + +type CreateGCJobRequest struct { + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the type of the job. + Type string `json:"type" binding:"required"` + + // Args is the arguments of the gc. + Args GCArgs `json:"args" binding:"required"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` +} + +type GCArgs struct { + // Type is the type of the job. + Type string `json:"type" binding:"required,oneof=audit job"` +} diff --git a/pkg/gc/gc.go b/pkg/gc/gc.go index 7396e99b262..92288560458 100644 --- a/pkg/gc/gc.go +++ b/pkg/gc/gc.go @@ -25,6 +25,20 @@ import ( "time" ) +type ContextKey string + +func (ck ContextKey) String() string { + return string(ck) +} + +var ( + // ContextKeyUserID is the key for user ID in context. + ContextKeyUserID ContextKey = "user_id" + + // ContextKeyTaskID is the key for task ID in context. + ContextKeyTaskID ContextKey = "task_id" +) + // GC is the interface used for release resource. type GC interface { // Add adds GC task.