Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (

// DeleteTaskJob is the name of deleting task job.
DeleteTaskJob = "delete_task"

// GCJob is the name of gc job.
GCJob = "gc"
)

// Machinery server configuration.
Expand Down
19 changes: 17 additions & 2 deletions manager/gc/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,25 @@ func (a *audit) RunGC(ctx context.Context) error {
return err
}

if err := a.recorder.Init(AuditGCTaskID, models.JSONMap{
args := models.JSONMap{
"ttl": ttl,
"batch_size": DefaultAuditGCBatchSize,
}); err != nil {
}

var userID uint
if id, ok := ctx.Value(pkggc.ContextKeyUserID).(uint); ok {
userID = id
}

var taskID string
if id, ok := ctx.Value(pkggc.ContextKeyTaskID).(string); ok {
taskID = id
} else {
// Use the default task ID if taskID is not provided. (applied to background periodic execution scenarios)
taskID = AuditGCTaskID
}

if err := a.recorder.Init(userID, taskID, args); err != nil {
return err
}

Expand Down
19 changes: 17 additions & 2 deletions manager/gc/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,25 @@ func (j *job) RunGC(ctx context.Context) error {
return err
}

if err = j.recorder.Init(JobGCTaskID, models.JSONMap{
args := models.JSONMap{
"ttl": ttl,
"batch_size": DefaultJobGCBatchSize,
}); err != nil {
}

var userID uint
if id, ok := ctx.Value(pkggc.ContextKeyUserID).(uint); ok {
userID = id
}

var taskID string
if id, ok := ctx.Value(pkggc.ContextKeyTaskID).(string); ok {
taskID = id
} else {
// Use the default task ID if taskID is not provided. (applied to background periodic execution scenarios)
taskID = AuditGCTaskID
}

if err = j.recorder.Init(userID, taskID, args); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion manager/gc/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func newJobRecorder(db *gorm.DB) *jobRecorder {
}
}

func (jb *jobRecorder) Init(taskID string, args models.JSONMap) error {
func (jb *jobRecorder) Init(userID uint, taskID string, args models.JSONMap) error {
job := models.Job{
Type: GCJobType,
TaskID: taskID,
UserID: userID,
Args: args,
}

Expand Down
14 changes: 14 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, job)
case job.GCJob:
var json types.CreateGCJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateGCJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
default:
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})
Expand Down
13 changes: 7 additions & 6 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"d7y.io/dragonfly/v2/manager/cache"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/database"
"d7y.io/dragonfly/v2/manager/gc"
managergc "d7y.io/dragonfly/v2/manager/gc"
"d7y.io/dragonfly/v2/manager/job"
"d7y.io/dragonfly/v2/manager/metrics"
"d7y.io/dragonfly/v2/manager/permission/rbac"
Expand Down Expand Up @@ -167,20 +167,21 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
}

// Initialize garbage collector.
s.gc = pkggc.New()

gc := pkggc.New()
// Register job gc task.
if err := s.gc.Add(gc.NewJobGCTask(db.DB)); err != nil {
if err := gc.Add(managergc.NewJobGCTask(db.DB)); err != nil {
return nil, err
}

// Register audit gc task.
if err := s.gc.Add(gc.NewAuditGCTask(db.DB)); err != nil {
if err := gc.Add(managergc.NewAuditGCTask(db.DB)); err != nil {
return nil, err
}

s.gc = gc

// Initialize REST server.
restService := service.New(cfg, db, cache, job, enforcer, objectStorage)
restService := service.New(cfg, db, cache, job, gc, enforcer, objectStorage)
router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, s.jobRateLimiter, EmbedFolder(assets, assetsTargetPath))
if err != nil {
return nil, err
Expand Down
60 changes: 60 additions & 0 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,80 @@ import (
"context"
"errors"
"fmt"
"time"

machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
"github.com/google/uuid"
"gorm.io/gorm"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/metrics"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/retry"
"d7y.io/dragonfly/v2/pkg/slices"
"d7y.io/dragonfly/v2/pkg/structure"
)

const (
// DefaultGCJobPollingTimeout is the default timeout for polling GC job.
DefaultGCJobPollingTimeout = 30 * time.Minute

// DefaultGCJobPollingInterval is the default interval for polling GC job.
DefaultGCJobPollingInterval = 5 * time.Second
)

func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest) (*models.Job, error) {
taskID := uuid.NewString()
ctx = context.WithValue(ctx, pkggc.ContextKeyTaskID, taskID)
ctx = context.WithValue(ctx, pkggc.ContextKeyUserID, json.UserID)

// This is a non-block function to run the gc task, which will run the task asynchronously in the backend.
if err := s.gc.Run(ctx, json.Args.Type); err != nil {
return nil, err
}

return s.pollingGCJob(ctx, json.Type, json.UserID, taskID)
}

func (s *service) pollingGCJob(ctx context.Context, jobType string, userID uint, taskID string) (*models.Job, error) {
ctx, cancel := context.WithTimeout(ctx, DefaultGCJobPollingTimeout)
defer cancel()

ticker := time.NewTicker(DefaultGCJobPollingInterval)
defer ticker.Stop()

job := models.Job{}

for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context done: %w", ctx.Err())

case <-ticker.C:
if err := s.db.WithContext(ctx).First(&job, models.Job{
Type: jobType,
UserID: userID,
TaskID: taskID,
}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
continue
}

return nil, err
}

// Return the job if the job is in success or failure state, otherwise continue polling.
if job.State == machineryv1tasks.StateSuccess || job.State == machineryv1tasks.StateFailure {
return &job, nil
}
}
}
}

func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error {
schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions manager/service/mocks/service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion manager/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/permission/rbac"
"d7y.io/dragonfly/v2/manager/types"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/pkg/objectstorage"
)

Expand Down Expand Up @@ -118,6 +119,7 @@ type Service interface {
CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error
CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error)
CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error)
CreateGCJob(context.Context, types.CreateGCJobRequest) (*models.Job, error)
DestroyJob(context.Context, uint) error
UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error)
GetJob(context.Context, uint) (*models.Job, error)
Expand Down Expand Up @@ -152,18 +154,20 @@ type service struct {
rdb redis.UniversalClient
cache *cache.Cache
job *job.Job
gc pkggc.GC
enforcer *casbin.Enforcer
objectStorage objectstorage.ObjectStorage
}

// NewREST returns a new REST instance
func New(cfg *config.Config, database *database.Database, cache *cache.Cache, job *job.Job, enforcer *casbin.Enforcer, objectStorage objectstorage.ObjectStorage) Service {
func New(cfg *config.Config, database *database.Database, cache *cache.Cache, job *job.Job, gc pkggc.GC, enforcer *casbin.Enforcer, objectStorage objectstorage.ObjectStorage) Service {
return &service{
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
job: job,
gc: gc,
enforcer: enforcer,
objectStorage: objectStorage,
}
Expand Down
19 changes: 19 additions & 0 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,22 @@ type DeleteTaskArgs struct {
// Otherwise, calculate the task ID based on url, piece_length, tag, application, and filtered_query_params.
ContentForCalculatingTaskID *string `json:"content_for_calculating_task_id" binding:"omitempty"`
}

type CreateGCJobRequest struct {
// BIO is the description of the job.
BIO string `json:"bio" binding:"omitempty"`

// Type is the type of the job.
Type string `json:"type" binding:"required"`

// Args is the arguments of the gc.
Args GCArgs `json:"args" binding:"required"`

// UserID is the user id of the job.
UserID uint `json:"user_id" binding:"omitempty"`
}

type GCArgs struct {
// Type is the type of the job.
Type string `json:"type" binding:"required,oneof=audit job"`
}
14 changes: 14 additions & 0 deletions pkg/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ import (
"time"
)

type ContextKey string

func (ck ContextKey) String() string {
return string(ck)
}

var (
// ContextKeyUserID is the key for user ID in context.
ContextKeyUserID ContextKey = "user_id"

// ContextKeyTaskID is the key for task ID in context.
ContextKeyTaskID ContextKey = "task_id"
)

// GC is the interface used for release resource.
type GC interface {
// Add adds GC task.
Expand Down