Skip to content

Commit a834c17

Browse files
committed
fix: reduce clustering
1 parent ff42ae3 commit a834c17

File tree

4 files changed

+60
-46
lines changed

4 files changed

+60
-46
lines changed

dependency.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ type ConsumableDispatcher interface {
5858
Consume(ctx context.Context) error
5959
}
6060

61-
// Configuration is the struct for queue configs.
62-
type Configuration struct {
61+
// configuration is the struct for queue configs.
62+
type configuration struct {
6363
RedisName string `yaml:"redisName" json:"redisName"`
6464
Parallelism int `yaml:"parallelism" json:"parallelism"`
6565
CheckQueueLengthIntervalSecond int `yaml:"checkQueueLengthIntervalSecond" json:"checkQueueLengthIntervalSecond"`
@@ -73,11 +73,8 @@ type makerIn struct {
7373
JobDispatcher JobDispatcher `optional:"true"`
7474
EventDispatcher contract.Dispatcher `optional:"true"`
7575
Logger log.Logger
76-
AppName contract.AppName
77-
Env contract.Env
7876
Gauge Gauge `optional:"true"`
7977
Populator contract.DIPopulator `optional:"true"`
80-
Driver Driver `optional:"true"`
8178
}
8279

8380
// makerOut is the di output JobFrom provideDispatcherFactory
@@ -99,7 +96,7 @@ func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut
9996
return func(p makerIn) (makerOut, error) {
10097
var (
10198
err error
102-
queueConfs map[string]Configuration
99+
queueConfs map[string]configuration
103100
)
104101
err = p.Conf.Unmarshal("queue", &queueConfs)
105102
if err != nil {
@@ -108,14 +105,14 @@ func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut
108105
factory := di.NewFactory(func(name string) (di.Pair, error) {
109106
var (
110107
ok bool
111-
conf Configuration
108+
conf configuration
112109
)
113110
p := p
114111
if conf, ok = queueConfs[name]; !ok {
115112
if name != "default" {
116-
return di.Pair{}, fmt.Errorf("queue Configuration %s not found", name)
113+
return di.Pair{}, fmt.Errorf("queue configuration %s not found", name)
117114
}
118-
conf = Configuration{
115+
conf = configuration{
119116
Parallelism: runtime.NumCPU(),
120117
CheckQueueLengthIntervalSecond: 0,
121118
}
@@ -135,12 +132,8 @@ func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut
135132
var driver = option.driver
136133
if option.driver == nil {
137134
driver, err = option.driverConstructor(
138-
DriverConstructorArgs{
135+
DriverArgs{
139136
Name: name,
140-
Conf: conf,
141-
Logger: p.Logger,
142-
AppName: p.AppName,
143-
Env: p.Env,
144137
Populator: p.Populator,
145138
},
146139
)
@@ -191,27 +184,41 @@ func (d makerOut) ProvideRunGroup(group *run.Group) {
191184
}
192185
}
193186

194-
func newDefaultDriver(args DriverConstructorArgs) (Driver, error) {
195-
var maker otredis.Maker
187+
func newDefaultDriver(args DriverArgs) (Driver, error) {
188+
var injected struct {
189+
di.In
190+
191+
contract.AppName
192+
contract.Env
193+
contract.Logger
194+
otredis.Maker
195+
contract.ConfigUnmarshaler
196+
}
197+
196198
if args.Populator == nil {
197199
return nil, errors.New("the default driver requires setting the populator in DI container")
198200
}
199-
if err := args.Populator.Populate(&maker); err != nil {
200-
return nil, fmt.Errorf("the default driver requires an otredis.Maker in DI container: %w", err)
201+
if err := args.Populator.Populate(&injected); err != nil {
202+
return nil, fmt.Errorf("missing dependency for the default queue driver: %w", err)
203+
}
204+
var redisName string
205+
if err := injected.ConfigUnmarshaler.Unmarshal(fmt.Sprintf("queue.%s.redisName", injected.AppName), &redisName); err != nil {
206+
return nil, fmt.Errorf("bad configuration: %w", err)
201207
}
202-
client, err := maker.Make(args.Conf.RedisName)
208+
209+
client, err := injected.Maker.Make(redisName)
203210
if err != nil {
204-
return nil, fmt.Errorf("the default driver requires the redis client called %s: %w", args.Conf.RedisName, err)
211+
return nil, fmt.Errorf("the default driver requires the redis client called %s: %w", redisName, err)
205212
}
206213
return &RedisDriver{
207-
Logger: args.Logger,
214+
Logger: injected.Logger,
208215
RedisClient: client,
209216
ChannelConfig: ChannelConfig{
210-
Delayed: fmt.Sprintf("{%s:%s:%s}:delayed", args.AppName.String(), args.Env.String(), args.Name),
211-
Failed: fmt.Sprintf("{%s:%s:%s}:failed", args.AppName.String(), args.Env.String(), args.Name),
212-
Reserved: fmt.Sprintf("{%s:%s:%s}:reserved", args.AppName.String(), args.Env.String(), args.Name),
213-
Waiting: fmt.Sprintf("{%s:%s:%s}:waiting", args.AppName.String(), args.Env.String(), args.Name),
214-
Timeout: fmt.Sprintf("{%s:%s:%s}:timeout", args.AppName.String(), args.Env.String(), args.Name),
217+
Delayed: fmt.Sprintf("{%s:%s:%s}:delayed", injected.AppName.String(), injected.Env.String(), args.Name),
218+
Failed: fmt.Sprintf("{%s:%s:%s}:failed", injected.AppName.String(), injected.Env.String(), args.Name),
219+
Reserved: fmt.Sprintf("{%s:%s:%s}:reserved", injected.AppName.String(), injected.Env.String(), args.Name),
220+
Waiting: fmt.Sprintf("{%s:%s:%s}:waiting", injected.AppName.String(), injected.Env.String(), args.Name),
221+
Timeout: fmt.Sprintf("{%s:%s:%s}:timeout", injected.AppName.String(), injected.Env.String(), args.Name),
215222
},
216223
}, nil
217224
}
@@ -239,7 +246,7 @@ func provideConfig() configOut {
239246
configs := []config.ExportedConfig{{
240247
Owner: "queue",
241248
Data: map[string]interface{}{
242-
"queue": map[string]Configuration{
249+
"queue": map[string]configuration{
243250
"default": {
244251
RedisName: "default",
245252
Parallelism: runtime.NumCPU(),

dependency_options.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ package queue
22

33
import (
44
"github.com/DoNewsCode/core/contract"
5-
"github.com/go-kit/kit/log"
65
)
76

87
type providersOption struct {
98
driver Driver
10-
driverConstructor func(args DriverConstructorArgs) (Driver, error)
9+
driverConstructor func(args DriverArgs) (Driver, error)
1110
}
1211

1312
// ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.
@@ -24,18 +23,14 @@ func WithDriver(driver Driver) ProvidersOptionFunc {
2423

2524
// WithDriverConstructor instructs the Providers to accept an alternative constructor for queue driver.
2625
// If the WithDriver option is set, this option becomes an no-op.
27-
func WithDriverConstructor(f func(args DriverConstructorArgs) (Driver, error)) ProvidersOptionFunc {
26+
func WithDriverConstructor(f func(args DriverArgs) (Driver, error)) ProvidersOptionFunc {
2827
return func(options *providersOption) {
2928
options.driverConstructor = f
3029
}
3130
}
3231

33-
// DriverConstructorArgs are arguments to construct the driver. See WithDriverConstructor.
34-
type DriverConstructorArgs struct {
32+
// DriverArgs are arguments to construct the driver. See WithDriverConstructor.
33+
type DriverArgs struct {
3534
Name string
36-
Conf Configuration
37-
Logger log.Logger
38-
AppName contract.AppName
39-
Env contract.Env
4035
Populator contract.DIPopulator
4136
}

dependency_test.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/DoNewsCode/core/config"
9+
"github.com/DoNewsCode/core/contract"
910
"github.com/DoNewsCode/core/di"
1011
"github.com/DoNewsCode/core/otredis"
1112
"github.com/go-kit/kit/log"
@@ -23,13 +24,28 @@ func (m maker) Make(name string) (redis.UniversalClient, error) {
2324
type populator struct{}
2425

2526
func (p populator) Populate(target interface{}) error {
26-
*(target.(*otredis.Maker)) = maker{}
27-
return nil
27+
g := di.NewGraph()
28+
g.Provide(func() contract.AppName {
29+
return config.AppName("test")
30+
})
31+
g.Provide(func() contract.Env {
32+
return config.Env("test")
33+
})
34+
g.Provide(func() log.Logger {
35+
return log.NewNopLogger()
36+
})
37+
g.Provide(func() otredis.Maker {
38+
return maker{}
39+
})
40+
g.Provide(func() contract.ConfigUnmarshaler {
41+
return config.MapAdapter{"queue": map[string]interface{}{"default": map[string]interface{}{"redisName": "default"}}}
42+
})
43+
return di.IntoPopulator(g).Populate(target)
2844
}
2945

3046
func TestProvideDispatcher(t *testing.T) {
3147
out, err := provideDispatcherFactory(&providersOption{})(makerIn{
32-
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]Configuration{
48+
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]configuration{
3349
"default": {
3450
"default",
3551
1,
@@ -44,8 +60,6 @@ func TestProvideDispatcher(t *testing.T) {
4460
JobDispatcher: &SyncDispatcher{},
4561
Populator: populator{},
4662
Logger: log.NewNopLogger(),
47-
AppName: config.AppName("test"),
48-
Env: config.EnvTesting,
4963
})
5064
assert.NoError(t, err)
5165
assert.NotNil(t, out.DispatcherFactory)
@@ -92,7 +106,7 @@ func (m mockDriver) Retry(ctx context.Context, message *PersistedJob) error {
92106

93107
func TestProvideDispatcher_withDriver(t *testing.T) {
94108
out, err := provideDispatcherFactory(&providersOption{driver: mockDriver{}})(makerIn{
95-
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]Configuration{
109+
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]configuration{
96110
"default": {
97111
"default",
98112
1,
@@ -106,8 +120,6 @@ func TestProvideDispatcher_withDriver(t *testing.T) {
106120
}}),
107121
JobDispatcher: &SyncDispatcher{},
108122
Logger: log.NewNopLogger(),
109-
AppName: config.AppName("test"),
110-
Env: config.EnvTesting,
111123
})
112124
assert.NoError(t, err)
113125
assert.NotNil(t, out.DispatcherFactory)

doc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
// Decorate(s *PersistedJob)
3838
// }
3939
//
40-
// The PersistentJob passed to the Decorate method contains the tunable Configuration such as maximum retries.
40+
// The PersistentJob passed to the Decorate method contains the tunable configuration such as maximum retries.
4141
//
4242
// No matter how you create a persisted Job, to fire it, send it though a dispatcher. The normal dispatcher in the
4343
// Jobs package won't work, as a queue implementation is required. Luckily, it is deadly simple to convert a standard
@@ -61,7 +61,7 @@
6161
//
6262
// Integrate
6363
//
64-
// The queue package exports Configuration in this format:
64+
// The queue package exports configuration in this format:
6565
//
6666
// queue:
6767
// default:

0 commit comments

Comments
 (0)