@@ -62,6 +62,77 @@ type scheduler struct {
6262
6363// RunGC implements the gc Runner interface.
6464func (s * scheduler ) RunGC (ctx context.Context ) error {
65+ if err := s .mark (ctx ); err != nil {
66+ // Only log error if failed in mark phase,
67+ // because this can wait to be executed in the next gc window.
68+ logger .Errorf ("failed to mark inactive schedulers: %v" , err )
69+ }
70+
71+ return s .sweep (ctx )
72+ }
73+
74+ // mark running the mark operation for marking inactive schedulers.
75+ func (s * scheduler ) mark (_ context.Context ) error {
76+ logger .Info ("running scheduler GC mark" )
77+
78+ var schedulers []* models.Scheduler
79+ for {
80+ if err := s .db .Model (& models.Scheduler {}).
81+ Where ("state = ?" , models .SchedulerStateActive ).
82+ // As there is no configuration for the old scheduler, so exclude these schedulers.
83+ Where ("config IS NOT NULL AND config != ''" ).
84+ Limit (DefaultSchedulerGCBatchSize ).
85+ Find (& schedulers ).Error ; err != nil {
86+ return err
87+ }
88+ if len (schedulers ) == 0 {
89+ break
90+ }
91+
92+ now := time .Now ()
93+ schedulerIDs := make ([]uint , 0 , len (schedulers ))
94+ for _ , scheduler := range schedulers {
95+ if scheduler .Config == nil {
96+ continue
97+ }
98+
99+ // Retrieve the keep alive interval from the scheduler's configuration.
100+ keepAliveInterval , ok := scheduler .Config ["manager_keep_alive_interval" ].(float64 )
101+ if ! ok {
102+ continue
103+ }
104+
105+ // Check whether the last keep alive time is greater than 3x keep alive interval,
106+ // indicating that the scheduler is inactive.
107+ if now .Sub (scheduler .LastKeepAliveAt ) > time .Duration (keepAliveInterval )* 3 {
108+ schedulerIDs = append (schedulerIDs , scheduler .ID )
109+ }
110+ }
111+
112+ if len (schedulerIDs ) > 0 {
113+ if err := s .db .Model (& models.Scheduler {}).
114+ Where ("id IN (?)" , schedulerIDs ).
115+ Update ("state" , models .SchedulerStateInactive ).
116+ Error ; err != nil {
117+ return err
118+ }
119+ }
120+
121+ logger .Infof ("scheduler GC marks %d schedulers to inactive" , len (schedulerIDs ))
122+
123+ // If this batch is not full, break the loop as it indicates that this is the last page.
124+ if len (schedulers ) < DefaultSchedulerGCBatchSize {
125+ break
126+ }
127+ }
128+
129+ return nil
130+ }
131+
132+ // sweep running the sweep operation for cleaning up inactive schedulers.
133+ func (s * scheduler ) sweep (ctx context.Context ) error {
134+ logger .Info ("running scheduler GC sweep" )
135+
65136 args := models.JSONMap {
66137 "type" : SchedulerGCTaskID ,
67138 "ttl" : DefaultSchedulerGCTTL ,
@@ -93,7 +164,12 @@ func (s *scheduler) RunGC(ctx context.Context) error {
93164 }()
94165
95166 for {
96- result := s .db .Where ("updated_at < ?" , time .Now ().Add (- DefaultSchedulerGCTTL )).Where ("state = ?" , models .SchedulerStateInactive ).Limit (DefaultSchedulerGCBatchSize ).Unscoped ().Delete (& models.Scheduler {})
167+ result := s .db .Where ("updated_at < ?" , time .Now ().
168+ Add (- DefaultSchedulerGCTTL )).
169+ Where ("state = ?" , models .SchedulerStateInactive ).
170+ Limit (DefaultSchedulerGCBatchSize ).
171+ Unscoped ().
172+ Delete (& models.Scheduler {})
97173 if result .Error != nil {
98174 gcResult .Error = result .Error
99175 return result .Error
@@ -104,7 +180,7 @@ func (s *scheduler) RunGC(ctx context.Context) error {
104180 }
105181
106182 gcResult .Purged += result .RowsAffected
107- logger .Infof ("gc scheduler deleted %d inactive schedulers" , result .RowsAffected )
183+ logger .Infof ("scheduler GC deleted %d inactive schedulers" , result .RowsAffected )
108184 }
109185
110186 return nil
0 commit comments