@@ -54,19 +54,36 @@ const defaultHeartbeatInterval = 10 * time.Second
54
54
// NewScheduler returns a new Scheduler instance given the redis connection option.
55
55
// The parameter opts is optional, defaults will be used if opts is set to nil
56
56
func NewScheduler (r RedisConnOpt , opts * SchedulerOpts ) * Scheduler {
57
+ scheduler := newScheduler (opts )
58
+
57
59
redisClient , ok := r .MakeRedisClient ().(redis.UniversalClient )
58
60
if ! ok {
59
61
panic (fmt .Sprintf ("asynq: unsupported RedisConnOpt type %T" , r ))
60
62
}
61
- scheduler := NewSchedulerFromRedisClient (redisClient , opts )
63
+
64
+ rdb := rdb .NewRDB (redisClient )
65
+
66
+ scheduler .rdb = rdb
67
+ scheduler .client = & Client {broker : rdb , sharedConnection : false }
62
68
scheduler .sharedConnection = false
69
+
63
70
return scheduler
64
71
}
65
72
66
73
// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
67
74
// The parameter opts is optional, defaults will be used if opts is set to nil.
68
75
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
69
76
func NewSchedulerFromRedisClient (c redis.UniversalClient , opts * SchedulerOpts ) * Scheduler {
77
+ scheduler := newScheduler (opts )
78
+
79
+ scheduler .rdb = rdb .NewRDB (c )
80
+ scheduler .client = NewClientFromRedisClient (c )
81
+ scheduler .sharedConnection = true
82
+
83
+ return scheduler
84
+ }
85
+
86
+ func newScheduler (opts * SchedulerOpts ) * Scheduler {
70
87
if opts == nil {
71
88
opts = & SchedulerOpts {}
72
89
}
@@ -93,8 +110,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
93
110
state : & serverState {value : srvStateNew },
94
111
heartbeatInterval : heartbeatInterval ,
95
112
logger : logger ,
96
- client : NewClientFromRedisClient (c ),
97
- rdb : rdb .NewRDB (c ),
98
113
cron : cron .New (cron .WithLocation (loc )),
99
114
location : loc ,
100
115
done : make (chan struct {}),
0 commit comments