Skip to content

Commit 10eae83

Browse files
committed
fix: cronjob not running
1 parent b480fe9 commit 10eae83

5 files changed

Lines changed: 86 additions & 47 deletions

File tree

broker/cron_job.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func CronJob(client redis.UniversalClient, broker Broker, message Message) ([]by
5353
})
5454
}
5555

56-
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId), value, 0).Err()
57-
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, *message.D.Name), value, 0).Err()
56+
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId), value, 0).Err()
57+
client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, *message.D.Name), timeVal, 0).Err()
5858

5959
if message.D.Route != nil {
6060
err = client.Set(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId), *message.D.Route, 0).Err()
@@ -64,14 +64,14 @@ func CronJob(client redis.UniversalClient, broker Broker, message Message) ([]by
6464
}
6565
}
6666

67-
client.SAdd(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, *message.D.Name), fmt.Sprintf("%s:%s", taskId, *message.D.Name)).Err()
67+
client.SAdd(context.Background(), constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, *message.D.Name)).Err()
6868

6969
log.Infof("Added task with ID %s to run every %v.\n", taskId, timeVal)
7070

7171
processor.ProcessCronJob(client, *broker.Channel, *message.D.Name, taskId)
7272

7373
return json.Marshal(map[string]interface{}{
74-
"t": constants.TASK_DELAY,
74+
"t": constants.TASK_CRON,
7575
"d": map[string]interface{}{
7676
"taskId": taskId,
7777
},

go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ require (
1010
github.com/redis/go-redis/v9 v9.0.4
1111
)
1212

13-
require github.com/robfig/cron v1.2.0
13+
require github.com/go-co-op/gocron v1.27.1
14+
15+
require (
16+
github.com/robfig/cron/v3 v3.0.1 // indirect
17+
go.uber.org/atomic v1.9.0 // indirect
18+
)
1419

1520
require (
1621
github.com/cespare/xxhash/v2 v2.2.0 // indirect

go.sum

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,53 @@ github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0
44
github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo=
55
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
66
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
78
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
810
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
911
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
1012
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
1113
github.com/disgoorg/log v1.2.0 h1:sqlXnu/ZKAlIlHV9IO+dbMto7/hCQ474vlIdMWk8QKo=
1214
github.com/disgoorg/log v1.2.0/go.mod h1:3x1KDG6DI1CE2pDwi3qlwT3wlXpeHW/5rVay+1qDqOo=
1315
github.com/disgoorg/snowflake/v2 v2.0.1 h1:CuUxGLwggUxEswZOmZ+mZ5i0xSumQdXW9tXW7uGqe+0=
1416
github.com/disgoorg/snowflake/v2 v2.0.1/go.mod h1:SPU9c2CNn5DSyb86QcKtdZgix9osEtKrHLW4rMhfLCs=
17+
github.com/go-co-op/gocron v1.27.1 h1:fYmK6COvF3rdFBbB4yQGWaf6TKIMjPv+1oaFrVx9bl8=
18+
github.com/go-co-op/gocron v1.27.1/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y=
1519
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
1620
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
1721
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
22+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
23+
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
1824
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
1925
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
26+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
27+
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
28+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2029
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
21-
github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM=
22-
github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
2330
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
2431
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
2532
github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
2633
github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
27-
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
28-
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
34+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
35+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
36+
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
37+
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
2938
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
3039
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
40+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
41+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
3142
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
3243
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
44+
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
45+
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
46+
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
47+
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
3348
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
3449
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
3550
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3651
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
52+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
53+
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
3754
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
55+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
3856
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

lib/task.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,25 @@ func InitTask(conf *config.Config) *Task {
3737
log.Fatalf("Unable to declare exchange due to: %v", err)
3838
}
3939

40-
processor.ProcessJob(task.Redis, *task.Broker.Channel); broker.HandleReceive(task.Redis, *task.Broker)
40+
go func () {
41+
Delays := task.Redis.ZCard(context.Background(), constants.TASK_REDIS_KEY).Val()
4142

42-
Members := task.Redis.SMembers(context.Background(), constants.TASK_REDIS_CRON_SETS).Val()
43+
log.Infof("Found %d delayed jobs", Delays)
44+
processor.ProcessJob(task.Redis, *task.Broker.Channel); broker.HandleReceive(task.Redis, *task.Broker)
45+
}()
4346

44-
for _, member := range Members {
45-
taskId := strings.Split(member, ":")[0]
46-
name := strings.Split(member, ":")[1]
47+
go func () {
48+
Members := task.Redis.SMembers(context.Background(), constants.TASK_REDIS_CRON_SETS).Val()
49+
50+
log.Infof("Found %d cron jobs", len(Members))
51+
52+
for _, member := range Members {
53+
taskId := strings.Split(member, ":")[0]
54+
name := strings.Split(member, ":")[1]
55+
56+
processor.ProcessCronJob(task.Redis, *task.Broker.Channel, name, taskId)
57+
}
58+
}()
4759

48-
processor.ProcessCronJob(task.Redis, *task.Broker.Channel, name, taskId)
49-
}
5060
return &task
5161
}

processor/cron_processor.go

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package processor
33
import (
44
"context"
55
"fmt"
6+
"time"
67

8+
"github.com/disgoorg/log"
9+
"github.com/go-co-op/gocron"
710
"github.com/nezuchan/scheduled-tasks/constants"
811
"github.com/rabbitmq/amqp091-go"
912
"github.com/redis/go-redis/v9"
10-
"github.com/robfig/cron"
1113
)
1214

1315
func ProcessCronJob(client redis.UniversalClient, broker amqp091.Channel, name string, taskId string) {
@@ -17,36 +19,40 @@ func ProcessCronJob(client redis.UniversalClient, broker amqp091.Channel, name s
1719
if CronValue == 1 && TaskKey == 1 {
1820
Cron := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name)).Val()
1921

20-
c := cron.New()
21-
22-
c.AddFunc(Cron, func() {
23-
Value := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val()
24-
25-
if Value == "" {
26-
c.Stop()
27-
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId))
28-
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))
29-
client.SRem(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, name))).Err()
30-
return
31-
}
32-
33-
Route := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val()
34-
35-
if Route != "" {
36-
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, Route, false, false, amqp091.Publishing{
37-
ContentType: "text/plain",
38-
Body: []byte(Value),
39-
})
40-
} else {
41-
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, "*", false, false, amqp091.Publishing{
42-
ContentType: "text/plain",
43-
Body: []byte(Value),
44-
})
45-
}
46-
})
47-
48-
c.Start()
49-
22+
c := gocron.NewScheduler(time.UTC)
23+
24+
c.Cron(Cron).Do(
25+
func(){
26+
log.Infof("Sending cron job %s to client", name)
27+
Value := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId)).Val()
28+
29+
if Value == "" {
30+
c.Stop()
31+
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_VALUE, taskId))
32+
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId))
33+
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))
34+
client.SRem(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_SETS, fmt.Sprintf("%s:%s", taskId, name))).Err()
35+
c.Clear()
36+
return
37+
}
38+
39+
Route := client.Get(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId)).Val()
40+
41+
if Route != "" {
42+
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, Route, false, false, amqp091.Publishing{
43+
ContentType: "text/plain",
44+
Body: []byte(Value),
45+
})
46+
} else {
47+
broker.PublishWithContext(context.Background(), constants.TASKER_EXCHANGE, "*", false, false, amqp091.Publishing{
48+
ContentType: "text/plain",
49+
Body: []byte(Value),
50+
})
51+
}
52+
},
53+
)
54+
55+
c.StartAsync()
5056
} else {
5157
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_KEY_ROUTE, taskId))
5258
client.Unlink(context.Background(), fmt.Sprintf("%s:%s", constants.TASK_REDIS_CRON_VALUE, name))

0 commit comments

Comments
 (0)