Skip to content

Commit 255c213

Browse files
committed
add: 延时任务区分最近时间任务
1 parent cdf80ec commit 255c213

1 file changed

Lines changed: 88 additions & 30 deletions

File tree

bootstrap/servers/queue_delay.go

Lines changed: 88 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/sirupsen/logrus"
1111
"gorm.io/gorm"
1212
"reflect"
13+
"sync/atomic"
1314
"time"
1415
)
1516

@@ -27,19 +28,21 @@ CREATE TABLE ` + "`" + `delay_queue` + "`" + ` (
2728
` + "`" + `job` + "`" + ` json NOT NULL COMMENT '任务信息',
2829
` + "`" + `run_at` + "`" + ` timestamp NOT NULL DEFAULT '2022-08-25 00:00:00' COMMENT '执行时间点',
2930
` + "`" + `created_at` + "`" + ` timestamp NOT NULL DEFAULT '2022-08-25 00:00:00' COMMENT '创建时间',
31+
` + "`" + `in_cache` + "`" + ` tinyint(4) unsigned NOT NULL DEFAULT '0' COMMENT '是否加入缓存',
3032
PRIMARY KEY (` + "`" + `id` + "`" + `),
3133
KEY ` + "`" + `idx_delay_queue_run_at` + "`" + ` (` + "`" + `run_at` + "`" + `)
3234
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='延时队列存储';
3335
`
3436

3537
// OrmDelayQueue @Bean
3638
type OrmDelayQueue struct {
37-
Id string `gorm:"column:id;type:varchar(64);primaryKey"` //
38-
Fail uint64 `gorm:"column:fail;type:bigint(20) unsigned;default:0;comment:'失败次数'"` // 失败次数
39-
Route string `gorm:"column:route;type:varchar(254);comment:'路由'"` // 路由
40-
Job database.JSON `gorm:"column:job;type:json;comment:'任务信息'"` // 任务信息
41-
RunAt database.Time `gorm:"column:run_at;type:timestamp;index:idx_delay_queue_run_at,class:BTREE;default:CURRENT_TIMESTAMP;comment:'执行时间点'"` // 执行时间点
42-
CreatedAt database.Time `gorm:"column:created_at;type:timestamp;default:2022-08-25 00:00:00;comment:'创建时间'"` // 创建时间
39+
Id string `gorm:"column:id;type:varchar(64);primaryKey"` //
40+
Fail uint64 `gorm:"column:fail;type:bigint(20) unsigned;default:0;comment:'失败次数'"` // 失败次数
41+
Route string `gorm:"column:route;type:varchar(254);comment:'路由'"` // 路由
42+
Job database.JSON `gorm:"column:job;type:json;comment:'任务信息'"` // 任务信息
43+
RunAt database.Time `gorm:"column:run_at;type:timestamp;index:idx_delay_queue_run_at,class:BTREE;default:2022-08-25 00:00:00;comment:'执行时间点'"` // 执行时间点
44+
CreatedAt database.Time `gorm:"column:created_at;type:timestamp;default:2022-08-25 00:00:00;comment:'创建时间'"` // 创建时间
45+
InCache uint32 `gorm:"column:in_cache;type:tinyint(4) unsigned;default:0;comment:'是否加入缓存'"` // 是否加入缓存
4346
}
4447

4548
func (receiver *OrmDelayQueue) TableName() string {
@@ -50,9 +53,13 @@ func (receiver *OrmDelayQueue) TableName() string {
5053
type DelayQueueForMysql struct {
5154
mysql *gorm.DB `inject:"database, @config(queue.delay.connect)"`
5255
queue *Queue `inject:""`
56+
57+
RunAfterFuncLimit int64
5358
}
5459

5560
func (d *DelayQueueForMysql) Init() {
61+
d.RunAfterFuncLimit = 500
62+
5663
if app2.Config("queue.delay.auth_migrate", true) {
5764
logrus.Info("你可以修改queue.delay.auth_migrate = false; 关闭自动迁移delay_queue表")
5865
err := d.mysql.Exec(createTable)
@@ -75,61 +82,112 @@ func (d *DelayQueueForMysql) Run() {
7582
// Loop TODO 待优化,如果启动了广播,可以内存维护多个节点的最近任务,可以去掉定时查询
7683
func (d *DelayQueueForMysql) Loop() {
7784
interval := app2.Config("queue.delay.interval", 60)
85+
86+
// 从把in_cache全部加入缓存
87+
list := make([]*OrmDelayQueue, 0)
88+
d.mysql.Model(&OrmDelayQueue{}).Where("in_cache = 1", time.Now()).
89+
Limit(int(d.RunAfterFuncLimit)).Order("id desc").
90+
Find(&list)
91+
for _, queue := range list {
92+
d.RunAfterFunc(queue)
93+
}
7894
for {
7995
list := make([]*OrmDelayQueue, 0)
80-
dbRet := d.mysql.Model(&OrmDelayQueue{}).Where("run_at <= ? and fail = 0", time.Now()).Limit(100).Order("id desc").Find(&list)
96+
dbRet := d.mysql.Model(&OrmDelayQueue{}).Where("run_at <= ? and fail = 0 and in_cache = 0", time.Now().Add(time.Duration(60))).
97+
Limit(100).
98+
Find(&list)
8199

82100
if dbRet.Error != nil {
83101
logrus.Error(dbRet.Error)
84102
} else if len(list) != 0 {
85103
delIds := make([]string, 0)
86104
for _, delayMsg := range list {
87-
handle, ok := d.queue.dispatch.Load(delayMsg.Route)
88-
if !ok {
89-
logrus.Errorf("无法处理的route: %v", delayMsg.Route)
90-
d.mysql.Model(&OrmDelayQueue{}).Where("id <= ?", delayMsg.Id).Update("fail", "1")
91-
continue
92-
}
93-
94-
event := delayMsg.Job
95-
job := handle.(reflect.Value)
96-
v := job.Interface()
97-
newJob, ok := v.(constraint.Job)
98-
if ok {
99-
by, _ := event.MarshalJSON()
100-
err := json.Unmarshal(by, newJob)
101-
if err == nil {
102-
newJob.Handler()
105+
// 还没有到时间就读出来, 设置到系统定时执行
106+
if delayMsg.RunAt.Time.Before(time.Now()) {
107+
if d.RunAfterFuncLimit > 0 {
108+
d.RunAfterFunc(delayMsg)
109+
d.mysql.Model(&OrmDelayQueue{}).Where("id = ?", delayMsg.Id).Update("in_cache", "1")
110+
}
111+
} else {
112+
if d.RunDelayJob(delayMsg) {
103113
delIds = append(delIds, delayMsg.Id)
104-
} else {
105-
logrus.Errorf("run delay job, json.Unmarshal data err = %v", err)
106-
d.mysql.Model(&OrmDelayQueue{}).Where("id <= ?", delayMsg.Id).Update("fail", "1")
107114
}
108115
}
109116
}
110-
d.mysql.Where("id in ?", delIds).Delete(&OrmDelayQueue{})
117+
if len(delIds) != 0 {
118+
d.mysql.Where("id in ?", delIds).Delete(&OrmDelayQueue{})
119+
}
111120
}
112121

113122
time.Sleep(time.Duration(interval) * time.Second)
114123
}
115124
}
116125

126+
func (d *DelayQueueForMysql) RunDelayJob(delayMsg *OrmDelayQueue) bool {
127+
handle, ok := d.queue.dispatch.Load(delayMsg.Route)
128+
if !ok {
129+
logrus.Errorf("无法处理的route: %v", delayMsg.Route)
130+
d.mysql.Model(&OrmDelayQueue{}).Where("id = ?", delayMsg.Id).Update("fail", "1")
131+
return false
132+
}
133+
134+
event := delayMsg.Job
135+
job := handle.(reflect.Value)
136+
v := job.Interface()
137+
newJob, ok := v.(constraint.Job)
138+
if ok {
139+
by, _ := event.MarshalJSON()
140+
err := json.Unmarshal(by, newJob)
141+
if err == nil {
142+
newJob.Handler()
143+
return true
144+
} else {
145+
logrus.Errorf("run delay job, json.Unmarshal data err = %v", err)
146+
d.mysql.Model(&OrmDelayQueue{}).Where("id = ?", delayMsg.Id).Update("fail", "1")
147+
return false
148+
}
149+
}
150+
logrus.Error("run delay job, v.(constraint.Job) not ok", v)
151+
return false
152+
}
153+
117154
func (d *DelayQueueForMysql) Push(task DelayTask) string {
118155
uid := uuid.NewV4().String()
119-
120-
d.mysql.Model(&OrmDelayQueue{}).Create(&OrmDelayQueue{
156+
delayMsg := &OrmDelayQueue{
121157
Id: uid,
122158
Fail: 0,
123159
Route: jobToRoute(task.message),
124160
Job: database.NewJSON(task.message),
125161
RunAt: database.Now().Add(task.interval),
126162
CreatedAt: database.Now(),
127-
})
163+
}
164+
165+
// 60 秒内直接设置定时齐
166+
if delayMsg.RunAt.Time.Before(time.Now().Add(60*time.Second)) && d.RunAfterFuncLimit > 0 {
167+
delayMsg.InCache = 1
168+
}
128169

170+
ret := d.mysql.Model(&OrmDelayQueue{}).Create(delayMsg)
171+
if ret.Error != nil {
172+
if delayMsg.InCache == 1 {
173+
d.RunAfterFunc(delayMsg)
174+
}
175+
}
129176
return uid
130177
}
131178

132179
func (d *DelayQueueForMysql) Del(id string) bool {
133180
ret := d.mysql.Where("id = ?", id).Delete(&OrmDelayQueue{})
134181
return ret.Error == nil
135182
}
183+
184+
func (d *DelayQueueForMysql) RunAfterFunc(delayMsg *OrmDelayQueue) {
185+
atomic.AddInt64(&d.RunAfterFuncLimit, -1)
186+
dTime := delayMsg.RunAt.Unix() - time.Now().Unix()
187+
time.AfterFunc(time.Duration(dTime)*time.Second, func() {
188+
atomic.AddInt64(&d.RunAfterFuncLimit, 1)
189+
if d.RunDelayJob(delayMsg) {
190+
d.mysql.Where("id = ?", delayMsg.Id).Delete(&OrmDelayQueue{})
191+
}
192+
})
193+
}

0 commit comments

Comments
 (0)