Skip to content

Commit 7ac806c

Browse files
committed
build: upgrade to core v0.9.0
1 parent f2c6db5 commit 7ac806c

File tree

5 files changed

+172
-136
lines changed

5 files changed

+172
-136
lines changed

dependency.go

Lines changed: 104 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import (
66
"runtime"
77
"time"
88

9-
"github.com/DoNewsCode/core/events"
10-
"github.com/DoNewsCode/core/otredis"
11-
129
"github.com/DoNewsCode/core/config"
1310
"github.com/DoNewsCode/core/contract"
1411
"github.com/DoNewsCode/core/di"
12+
"github.com/DoNewsCode/core/events"
13+
"github.com/DoNewsCode/core/otredis"
1514
"github.com/go-kit/kit/log"
1615
"github.com/go-kit/kit/log/level"
1716
"github.com/go-kit/kit/metrics"
1817
"github.com/oklog/run"
18+
"github.com/pkg/errors"
1919
)
2020

2121
/*
@@ -36,8 +36,17 @@ DispatcherMaker, the JobDispatcher and the exported configs.
3636
JobDispatcher
3737
*Queue
3838
*/
39-
func Providers() di.Deps {
40-
return []interface{}{provideDispatcherFactory, provideConfig, provideDispatcher}
39+
func Providers(optionFunc ...ProvidersOptionFunc) di.Deps {
40+
option := &providersOption{}
41+
for _, f := range optionFunc {
42+
f(option)
43+
}
44+
return []interface{}{
45+
provideDispatcherFactory(option),
46+
provideConfig,
47+
provideDispatcher,
48+
di.Bind(new(DispatcherFactory), new(DispatcherMaker)),
49+
}
4150
}
4251

4352
// Gauge is an alias used for dependency injection
@@ -62,21 +71,17 @@ type makerIn struct {
6271
Conf contract.ConfigAccessor
6372
JobDispatcher JobDispatcher `optional:"true"`
6473
EventDispatcher contract.Dispatcher `optional:"true"`
65-
Driver Driver `optional:"true"`
66-
RedisMaker otredis.Maker `optional:"true"`
6774
Logger log.Logger
6875
AppName contract.AppName
6976
Env contract.Env
70-
Gauge Gauge `optional:"true"`
77+
Gauge Gauge `optional:"true"`
78+
Populator contract.DIPopulator `optional:"true"`
7179
}
7280

7381
// makerOut is the di output JobFrom provideDispatcherFactory
7482
type makerOut struct {
7583
di.Out
76-
77-
DispatcherMaker DispatcherMaker
7884
DispatcherFactory DispatcherFactory
79-
ExportedConfig []config.ExportedConfig `group:"config,flatten"`
8085
}
8186

8287
func (d makerOut) ModuleSentinel() {}
@@ -85,86 +90,81 @@ func (m makerOut) Module() interface{} { return m }
8590

8691
// provideDispatcherFactory is a provider for *DispatcherFactory and *Queue.
8792
// It also provides an interface for each.
88-
func provideDispatcherFactory(p makerIn) (makerOut, error) {
89-
var (
90-
err error
91-
queueConfs map[string]configuration
92-
)
93-
err = p.Conf.Unmarshal("queue", &queueConfs)
94-
if err != nil {
95-
level.Warn(p.Logger).Log("err", err)
93+
func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut, error) {
94+
if option.driverConstructor == nil {
95+
option.driverConstructor = newDefaultDriver
9696
}
97-
factory := di.NewFactory(func(name string) (di.Pair, error) {
97+
return func(p makerIn) (makerOut, error) {
9898
var (
99-
ok bool
100-
conf configuration
99+
err error
100+
queueConfs map[string]configuration
101101
)
102-
p := p
103-
if conf, ok = queueConfs[name]; !ok {
104-
if name != "default" {
105-
return di.Pair{}, fmt.Errorf("queue configuration %s not found", name)
106-
}
107-
conf = configuration{Parallelism: runtime.NumCPU(), CheckQueueLengthIntervalSecond: 0}
108-
}
109-
110-
if p.JobDispatcher == nil {
111-
p.JobDispatcher = &SyncDispatcher{}
112-
}
113-
if p.EventDispatcher == nil {
114-
p.EventDispatcher = &events.SyncDispatcher{}
115-
}
116-
117-
if p.Gauge != nil {
118-
p.Gauge = p.Gauge.With("queue", name)
102+
err = p.Conf.Unmarshal("queue", &queueConfs)
103+
if err != nil {
104+
level.Warn(p.Logger).Log("err", err)
119105
}
106+
factory := di.NewFactory(func(name string) (di.Pair, error) {
107+
var (
108+
ok bool
109+
conf configuration
110+
)
111+
p := p
112+
if conf, ok = queueConfs[name]; !ok {
113+
if name != "default" {
114+
return di.Pair{}, fmt.Errorf("queue configuration %s not found", name)
115+
}
116+
conf = configuration{Parallelism: runtime.NumCPU(), CheckQueueLengthIntervalSecond: 0}
117+
}
120118

121-
if p.Driver == nil {
122-
if p.RedisMaker == nil {
123-
return di.Pair{}, fmt.Errorf("default redis client not found, please provide it or provide a queue.Driver")
119+
if p.JobDispatcher == nil {
120+
p.JobDispatcher = &SyncDispatcher{}
124121
}
125-
if conf.RedisName == "" {
126-
conf.RedisName = "default"
122+
if p.EventDispatcher == nil {
123+
p.EventDispatcher = &events.SyncDispatcher{}
127124
}
128-
redisClient, err := p.RedisMaker.Make(conf.RedisName)
129-
if err != nil {
130-
return di.Pair{}, fmt.Errorf("failed to initiate redis driver: %w", err)
125+
126+
if p.Gauge != nil {
127+
p.Gauge = p.Gauge.With("queue", name)
131128
}
132-
p.Driver = &RedisDriver{
133-
Logger: p.Logger,
134-
RedisClient: redisClient,
135-
ChannelConfig: ChannelConfig{
136-
Delayed: fmt.Sprintf("{%s:%s:%s}:delayed", p.AppName.String(), p.Env.String(), name),
137-
Failed: fmt.Sprintf("{%s:%s:%s}:failed", p.AppName.String(), p.Env.String(), name),
138-
Reserved: fmt.Sprintf("{%s:%s:%s}:reserved", p.AppName.String(), p.Env.String(), name),
139-
Waiting: fmt.Sprintf("{%s:%s:%s}:waiting", p.AppName.String(), p.Env.String(), name),
140-
Timeout: fmt.Sprintf("{%s:%s:%s}:timeout", p.AppName.String(), p.Env.String(), name),
141-
},
129+
130+
var driver = option.driver
131+
if option.driver == nil {
132+
driver, err = option.driverConstructor(DriverConstructorArgs{
133+
Name: "name",
134+
Conf: conf,
135+
Logger: p.Logger,
136+
AppName: p.AppName,
137+
Env: p.Env,
138+
Populator: p.Populator,
139+
})
140+
if err != nil {
141+
return di.Pair{}, err
142+
}
142143
}
144+
queuedDispatcher := NewQueue(
145+
driver,
146+
UseLogger(p.Logger),
147+
UseParallelism(conf.Parallelism),
148+
UseGauge(p.Gauge, time.Duration(conf.CheckQueueLengthIntervalSecond)*time.Second),
149+
UseJobDispatcher(p.JobDispatcher),
150+
UseEventDispatcher(p.EventDispatcher),
151+
)
152+
return di.Pair{
153+
Closer: nil,
154+
Conn: queuedDispatcher,
155+
}, nil
156+
})
157+
158+
// Queue must be created eagerly, so that the consumer goroutines can start on boot up.
159+
for name := range queueConfs {
160+
factory.Make(name)
143161
}
144-
queuedDispatcher := NewQueue(
145-
p.Driver,
146-
UseLogger(p.Logger),
147-
UseParallelism(conf.Parallelism),
148-
UseGauge(p.Gauge, time.Duration(conf.CheckQueueLengthIntervalSecond)*time.Second),
149-
UseJobDispatcher(p.JobDispatcher),
150-
UseEventDispatcher(p.EventDispatcher),
151-
)
152-
return di.Pair{
153-
Closer: nil,
154-
Conn: queuedDispatcher,
155-
}, nil
156-
})
157162

158-
// Queue must be created eagerly, so that the consumer goroutines can start on boot up.
159-
for name := range queueConfs {
160-
factory.Make(name)
163+
dispatcherFactory := DispatcherFactory{Factory: factory}
164+
return makerOut{
165+
DispatcherFactory: dispatcherFactory,
166+
}, nil
161167
}
162-
163-
dispatcherFactory := DispatcherFactory{Factory: factory}
164-
return makerOut{
165-
DispatcherFactory: dispatcherFactory,
166-
DispatcherMaker: dispatcherFactory,
167-
}, nil
168168
}
169169

170170
// ProvideRunGroup implements container.RunProvider.
@@ -184,6 +184,31 @@ func (d makerOut) ProvideRunGroup(group *run.Group) {
184184
}
185185
}
186186

187+
func newDefaultDriver(args DriverConstructorArgs) (Driver, error) {
188+
var maker otredis.Maker
189+
if args.Populator == nil {
190+
return nil, errors.New("the default driver requires setting the populator in DI container")
191+
}
192+
if err := args.Populator.Populate(&maker); err != nil {
193+
return nil, fmt.Errorf("the default driver requires an otredis.Maker in DI container: %w", err)
194+
}
195+
client, err := maker.Make(args.Conf.RedisName)
196+
if err != nil {
197+
return nil, fmt.Errorf("the default driver requires the redis client called %s: %w", args.Conf.RedisName, err)
198+
}
199+
return &RedisDriver{
200+
Logger: args.Logger,
201+
RedisClient: client,
202+
ChannelConfig: ChannelConfig{
203+
Delayed: fmt.Sprintf("{%s:%s:%s}:delayed", args.AppName.String(), args.Env.String(), args.Name),
204+
Failed: fmt.Sprintf("{%s:%s:%s}:failed", args.AppName.String(), args.Env.String(), args.Name),
205+
Reserved: fmt.Sprintf("{%s:%s:%s}:reserved", args.AppName.String(), args.Env.String(), args.Name),
206+
Waiting: fmt.Sprintf("{%s:%s:%s}:waiting", args.AppName.String(), args.Env.String(), args.Name),
207+
Timeout: fmt.Sprintf("{%s:%s:%s}:timeout", args.AppName.String(), args.Env.String(), args.Name),
208+
},
209+
}, nil
210+
}
211+
187212
type dispatcherOut struct {
188213
di.Out
189214

dependency_options.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package queue
2+
3+
import (
4+
"github.com/DoNewsCode/core/contract"
5+
"github.com/go-kit/kit/log"
6+
)
7+
8+
type providersOption struct {
9+
driver Driver
10+
driverConstructor func(args DriverConstructorArgs) (Driver, error)
11+
}
12+
13+
// ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.
14+
type ProvidersOptionFunc func(options *providersOption)
15+
16+
// WithDriver instructs the Providers to accept a queue driver
17+
// different from the default one. This option supersedes the
18+
// WithDriverConstructor option.
19+
func WithDriver(driver Driver) ProvidersOptionFunc {
20+
return func(options *providersOption) {
21+
options.driver = driver
22+
}
23+
}
24+
25+
// WithDriverConstructor instructs the Providers to accept an alternative constructor for queue driver.
26+
// If the WithDriver option is set, this option becomes an no-op.
27+
func WithDriverConstructor(f func(args DriverConstructorArgs) (Driver, error)) ProvidersOptionFunc {
28+
return func(options *providersOption) {
29+
options.driverConstructor = f
30+
}
31+
}
32+
33+
// DriverConstructorArgs are arguments to construct the driver. See WithDriverConstructor.
34+
type DriverConstructorArgs struct {
35+
Name string
36+
Conf configuration
37+
Logger log.Logger
38+
AppName contract.AppName
39+
Env contract.Env
40+
Populator contract.DIPopulator
41+
}

dependency_test.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/DoNewsCode/core/config"
99
"github.com/DoNewsCode/core/di"
10+
"github.com/DoNewsCode/core/otredis"
1011
"github.com/go-kit/kit/log"
1112
"github.com/go-redis/redis/v8"
1213
"github.com/stretchr/testify/assert"
@@ -19,9 +20,16 @@ func (m maker) Make(name string) (redis.UniversalClient, error) {
1920
return redis.NewUniversalClient(&redis.UniversalOptions{Addrs: envDefaultRedisAddrs}), nil
2021
}
2122

23+
type populator struct{}
24+
25+
func (p populator) Populate(target interface{}) error {
26+
*(target.(*otredis.Maker)) = maker{}
27+
return nil
28+
}
29+
2230
func TestProvideDispatcher(t *testing.T) {
23-
out, err := provideDispatcherFactory(makerIn{
24-
Conf: config.MapAdapter{"queue": map[string]configuration{
31+
out, err := provideDispatcherFactory(&providersOption{})(makerIn{
32+
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]configuration{
2533
"default": {
2634
"default",
2735
1,
@@ -32,20 +40,19 @@ func TestProvideDispatcher(t *testing.T) {
3240
3,
3341
5,
3442
},
35-
}},
43+
}}),
3644
JobDispatcher: &SyncDispatcher{},
37-
RedisMaker: maker{},
45+
Populator: populator{},
3846
Logger: log.NewNopLogger(),
3947
AppName: config.AppName("test"),
4048
Env: config.EnvTesting,
4149
})
4250
assert.NoError(t, err)
4351
assert.NotNil(t, out.DispatcherFactory)
44-
assert.NotNil(t, out.DispatcherMaker)
45-
def, err := out.DispatcherMaker.Make("alternative")
52+
def, err := out.DispatcherFactory.Make("alternative")
4653
assert.NoError(t, err)
4754
assert.NotNil(t, def)
48-
assert.Implements(t, (*di.Module)(nil), out)
55+
assert.Implements(t, (*di.Modular)(nil), out)
4956
}
5057

5158
type mockDriver struct {
@@ -84,8 +91,8 @@ func (m mockDriver) Retry(ctx context.Context, message *PersistedJob) error {
8491
}
8592

8693
func TestProvideDispatcher_withDriver(t *testing.T) {
87-
out, err := provideDispatcherFactory(makerIn{
88-
Conf: config.MapAdapter{"queue": map[string]configuration{
94+
out, err := provideDispatcherFactory(&providersOption{driver: mockDriver{}})(makerIn{
95+
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]configuration{
8996
"default": {
9097
"default",
9198
1,
@@ -96,20 +103,18 @@ func TestProvideDispatcher_withDriver(t *testing.T) {
96103
3,
97104
5,
98105
},
99-
}},
106+
}}),
100107
JobDispatcher: &SyncDispatcher{},
101-
Driver: mockDriver{},
102108
Logger: log.NewNopLogger(),
103109
AppName: config.AppName("test"),
104110
Env: config.EnvTesting,
105111
})
106112
assert.NoError(t, err)
107113
assert.NotNil(t, out.DispatcherFactory)
108-
assert.NotNil(t, out.DispatcherMaker)
109-
def, err := out.DispatcherMaker.Make("alternative")
114+
def, err := out.DispatcherFactory.Make("alternative")
110115
assert.NoError(t, err)
111116
assert.NotNil(t, def)
112-
assert.Implements(t, (*di.Module)(nil), out)
117+
assert.Implements(t, (*di.Modular)(nil), out)
113118
}
114119

115120
func TestProvideConfigs(t *testing.T) {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/DoNewsCode/core-queue
33
go 1.16
44

55
require (
6-
github.com/DoNewsCode/core v0.8.0
6+
github.com/DoNewsCode/core v0.9.0
77
github.com/go-kit/kit v0.11.0
88
github.com/go-redis/redis/v8 v8.11.3
99
github.com/knadh/koanf v1.2.1

0 commit comments

Comments
 (0)