Skip to content

Commit 4e3024f

Browse files
chore: ci & review bugfix
--story=120702860
1 parent 93ebe26 commit 4e3024f

File tree

35 files changed

+1035
-1372
lines changed

35 files changed

+1035
-1372
lines changed

src/source_controller/cacheservice/app/server.go

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func Run(ctx context.Context, cancel context.CancelFunc, op *options.ServerOptio
103103
}
104104
select {
105105
case <-ctx.Done():
106+
cacheService.Scheduler().Stop()
106107
}
107108
return nil
108109
}

src/source_controller/cacheservice/cache/biz-topo/topo.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"configcenter/src/source_controller/cacheservice/cache/custom/cache"
3838
watchcli "configcenter/src/source_controller/cacheservice/event/watch"
3939
"configcenter/src/storage/driver/mongodb"
40-
"configcenter/src/storage/stream/task"
4140
)
4241

4342
// Topo defines the business topology caching logics
@@ -47,14 +46,14 @@ type Topo struct {
4746
}
4847

4948
// New Topo
50-
func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task, cacheSet *cache.CacheSet,
51-
watchCli *watchcli.Client) (*Topo, error) {
49+
func New(isMaster discovery.ServiceManageInterface, cacheSet *cache.CacheSet, watchCli *watchcli.Client) (*Topo,
50+
error) {
5251

5352
t := &Topo{
5453
isMaster: isMaster,
5554
}
5655

57-
watcher, err := watch.New(isMaster, watchTask, cacheSet, watchCli)
56+
watcher, err := watch.New(isMaster, cacheSet, watchCli)
5857
if err != nil {
5958
return nil, fmt.Errorf("new watcher failed, err: %v", err)
6059
}
@@ -67,6 +66,11 @@ func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task, cacheS
6766
return t, nil
6867
}
6968

69+
// Watcher returns the business topology event watcher
70+
func (t *Topo) Watcher() *watch.Watcher {
71+
return t.watcher
72+
}
73+
7074
// loopBizTopoCache launch the task to loop business's brief topology every interval minutes.
7175
func (t *Topo) loopBizTopoCache(topoKey key.Key) {
7276
for {

src/source_controller/cacheservice/cache/biz-topo/watch/kube.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
tokenhandler "configcenter/src/source_controller/cacheservice/cache/token-handler"
3737
dbtypes "configcenter/src/storage/dal/types"
3838
"configcenter/src/storage/driver/mongodb"
39+
"configcenter/src/storage/stream/task"
3940
streamtypes "configcenter/src/storage/stream/types"
4041

4142
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -113,11 +114,12 @@ func (w *kubeWatcher) watchTopo(obj string, doBatch func(*streamtypes.DBInfo, []
113114
opts.WatchTaskOptions.CollOpts.EventStruct = new(kubetypes.Pod)
114115
}
115116

116-
err = w.watcher.task.AddLoopBatchTask(opts)
117+
watchTask, err := task.NewLoopBatchTask(opts)
117118
if err != nil {
118-
blog.Errorf("watch kube biz topo collection %s failed, err: %v", collection, err)
119+
blog.Errorf("generate kube biz topo collection %s watch task failed, err: %v", collection, err)
119120
return err
120121
}
122+
w.watcher.tasks = append(w.watcher.tasks, watchTask)
121123
}
122124

123125
return nil

src/source_controller/cacheservice/cache/biz-topo/watch/watch.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,20 @@ import (
2828
// Watcher defines mongodb event watcher for biz topology
2929
type Watcher struct {
3030
isMaster discovery.ServiceManageInterface
31-
task *task.Task
3231
cacheSet *cache.CacheSet
3332
watchCli *watchcli.Client
33+
tasks []*task.Task
3434
}
3535

3636
// New biz topology mongodb event watcher
37-
func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task, cacheSet *cache.CacheSet,
38-
watchCli *watchcli.Client) (*Watcher, error) {
37+
func New(isMaster discovery.ServiceManageInterface, cacheSet *cache.CacheSet, watchCli *watchcli.Client) (*Watcher,
38+
error) {
3939

4040
watcher := &Watcher{
4141
isMaster: isMaster,
42-
task: watchTask,
4342
cacheSet: cacheSet,
4443
watchCli: watchCli,
44+
tasks: make([]*task.Task, 0),
4545
}
4646

4747
if err := watcher.watchKube(); err != nil {
@@ -54,3 +54,8 @@ func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task, cacheS
5454

5555
return watcher, nil
5656
}
57+
58+
// GetWatchTasks returns the event watch tasks
59+
func (w *Watcher) GetWatchTasks() []*task.Task {
60+
return w.tasks
61+
}

src/source_controller/cacheservice/cache/cache.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,24 @@ import (
2929
)
3030

3131
// NewCache new cache service
32-
func NewCache(watchTask *task.Task, isMaster discovery.ServiceManageInterface) (*ClientSet, error) {
32+
func NewCache(isMaster discovery.ServiceManageInterface) (*ClientSet, error) {
3333
if err := mainline.NewMainlineCache(isMaster); err != nil {
3434
return nil, fmt.Errorf("new mainline cache failed, err: %v", err)
3535
}
3636

37-
customCache, err := custom.New(isMaster, watchTask)
37+
customCache, err := custom.New(isMaster)
3838
if err != nil {
3939
return nil, fmt.Errorf("new custom resource cache failed, err: %v", err)
4040
}
4141

4242
watchCli := watch.NewClient(mongodb.Dal("watch"), mongodb.Dal(), redis.Client())
4343

44-
generalCache, err := general.New(isMaster, watchTask, watchCli)
44+
generalCache, err := general.New(isMaster, watchCli)
4545
if err != nil {
4646
return nil, fmt.Errorf("new general resource cache failed, err: %v", err)
4747
}
4848

49-
topoTreeClient, err := biztopo.New(isMaster, watchTask, customCache.CacheSet(), watchCli)
49+
topoTreeClient, err := biztopo.New(isMaster, customCache.CacheSet(), watchCli)
5050
if err != nil {
5151
return nil, fmt.Errorf("new common topo cache failed, err: %v", err)
5252
}
@@ -73,3 +73,11 @@ type ClientSet struct {
7373
Custom *custom.Cache
7474
General *general.Cache
7575
}
76+
77+
// GetWatchTasks returns the event watch tasks
78+
func (c *ClientSet) GetWatchTasks() []*task.Task {
79+
tasks := c.Topo.Watcher().GetWatchTasks()
80+
tasks = append(tasks, c.Custom.Watcher().GetWatchTasks()...)
81+
tasks = append(tasks, c.General.FullSyncCond().GetWatchTasks()...)
82+
return tasks
83+
}

src/source_controller/cacheservice/cache/custom/cache.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,25 @@ import (
2424
"configcenter/src/apimachinery/discovery"
2525
"configcenter/src/source_controller/cacheservice/cache/custom/cache"
2626
"configcenter/src/source_controller/cacheservice/cache/custom/watch"
27-
"configcenter/src/storage/stream/task"
2827
)
2928

3029
// Cache defines the custom resource caching logics
3130
type Cache struct {
3231
cacheSet *cache.CacheSet
32+
watcher *watch.Watcher
3333
}
3434

3535
// New Cache
36-
func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task) (*Cache, error) {
36+
func New(isMaster discovery.ServiceManageInterface) (*Cache, error) {
3737
t := &Cache{
3838
cacheSet: cache.New(isMaster),
3939
}
4040

41-
if err := watch.Init(watchTask, t.cacheSet); err != nil {
41+
watcher, err := watch.Init(t.cacheSet)
42+
if err != nil {
4243
return nil, fmt.Errorf("initialize custom resource watcher failed, err: %v", err)
4344
}
45+
t.watcher = watcher
4446

4547
t.cacheSet.LoopRefreshCache()
4648
return t, nil
@@ -50,3 +52,8 @@ func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task) (*Cach
5052
func (c *Cache) CacheSet() *cache.CacheSet {
5153
return c.cacheSet
5254
}
55+
56+
// Watcher returns custom resource event watcher
57+
func (c *Cache) Watcher() *watch.Watcher {
58+
return c.watcher
59+
}

src/source_controller/cacheservice/cache/custom/watch/watch.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,31 @@ import (
3535

3636
// Watcher defines mongodb event watcher for custom resource
3737
type Watcher struct {
38-
task *task.Task
3938
cacheSet *cache.CacheSet
39+
tasks []*task.Task
4040
}
4141

4242
// Init custom resource mongodb event watcher
43-
func Init(watchTask *task.Task, cacheSet *cache.CacheSet) error {
43+
func Init(cacheSet *cache.CacheSet) (*Watcher, error) {
4444
watcher := &Watcher{
45-
task: watchTask,
4645
cacheSet: cacheSet,
46+
tasks: make([]*task.Task, 0),
4747
}
4848

4949
if err := watcher.watchPodLabel(); err != nil {
50-
return err
50+
return nil, err
5151
}
5252

5353
if err := watcher.watchSharedNsRel(); err != nil {
54-
return err
54+
return nil, err
5555
}
5656

57-
return nil
57+
return watcher, nil
58+
}
59+
60+
// GetWatchTasks returns the event watch tasks
61+
func (w *Watcher) GetWatchTasks() []*task.Task {
62+
return w.tasks
5863
}
5964

6065
type watchOptions struct {
@@ -102,11 +107,12 @@ func (w *Watcher) watchCustomResource(opt *watchOptions) (bool, error) {
102107
BatchSize: 200,
103108
}
104109

105-
err = w.task.AddLoopBatchTask(opts)
110+
watchTask, err := task.NewLoopBatchTask(opts)
106111
if err != nil {
107-
blog.Errorf("watch custom resource %s, but add loop batch task failed, err: %v", name, err)
112+
blog.Errorf("watch custom resource %s, but generate loop batch task failed, err: %v", name, err)
108113
return false, err
109114
}
115+
w.tasks = append(w.tasks, watchTask)
110116

111117
return exists, nil
112118
}

src/source_controller/cacheservice/cache/general/cache.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"configcenter/src/source_controller/cacheservice/cache/general/types"
2929
"configcenter/src/source_controller/cacheservice/cache/general/watch"
3030
watchcli "configcenter/src/source_controller/cacheservice/event/watch"
31-
"configcenter/src/storage/stream/task"
3231
)
3332

3433
// Cache defines the general resource caching logics
@@ -38,7 +37,7 @@ type Cache struct {
3837
}
3938

4039
// New Cache
41-
func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task, watchCli *watchcli.Client) (*Cache, error) {
40+
func New(isMaster discovery.ServiceManageInterface, watchCli *watchcli.Client) (*Cache, error) {
4241
cacheSet := cache.GetAllCache()
4342

4443
fullSyncCondChMap := make(map[general.ResType]chan<- types.FullSyncCondEvent)
@@ -49,7 +48,7 @@ func New(isMaster discovery.ServiceManageInterface, watchTask *task.Task, watchC
4948
fullSyncCondChMap[resType] = cacheInst.FullSyncCondCh()
5049
}
5150

52-
fullSyncCondCli, err := fullsynccond.New(watchTask, fullSyncCondChMap)
51+
fullSyncCondCli, err := fullsynccond.New(fullSyncCondChMap)
5352
if err != nil {
5453
return nil, fmt.Errorf("init full sync cond failed, err: %v", err)
5554
}

src/source_controller/cacheservice/cache/general/full-sync-cond/full_sync_cond.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@ import (
2828

2929
// FullSyncCond defines the full sync cond related logics
3030
type FullSyncCond struct {
31-
task *task.Task
3231
chMap map[general.ResType]chan<- types.FullSyncCondEvent
32+
tasks []*task.Task
3333
}
3434

3535
// New FullSyncCond
36-
func New(watchTask *task.Task, chMap map[general.ResType]chan<- types.FullSyncCondEvent) (*FullSyncCond, error) {
36+
func New(chMap map[general.ResType]chan<- types.FullSyncCondEvent) (*FullSyncCond, error) {
3737
f := &FullSyncCond{
38-
task: watchTask,
3938
chMap: chMap,
4039
}
4140

@@ -45,3 +44,8 @@ func New(watchTask *task.Task, chMap map[general.ResType]chan<- types.FullSyncCo
4544

4645
return f, nil
4746
}
47+
48+
// GetWatchTasks returns the event watch tasks
49+
func (f *FullSyncCond) GetWatchTasks() []*task.Task {
50+
return f.tasks
51+
}

src/source_controller/cacheservice/cache/general/full-sync-cond/watch.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
tokenhandler "configcenter/src/source_controller/cacheservice/cache/token-handler"
3434
"configcenter/src/storage/dal/mongo/local"
3535
"configcenter/src/storage/driver/mongodb"
36+
"configcenter/src/storage/stream/task"
3637
"configcenter/src/storage/stream/types"
3738
)
3839

@@ -67,11 +68,12 @@ func (f *FullSyncCond) Watch() error {
6768
BatchSize: 200,
6869
}
6970

70-
err := f.task.AddLoopBatchTask(opts)
71+
watchTask, err := task.NewLoopBatchTask(opts)
7172
if err != nil {
72-
blog.Errorf("add watch full sync cond task failed, err: %v", err)
73+
blog.Errorf("generate watch full sync cond task failed, err: %v", err)
7374
return err
7475
}
76+
f.tasks = append(f.tasks, watchTask)
7577

7678
return nil
7779
}

src/source_controller/cacheservice/event/bsrelation/bsrelation.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package bsrelation
1515

1616
import (
17-
"context"
1817
"time"
1918

2019
"configcenter/src/common"
@@ -29,11 +28,22 @@ const (
2928
bizSetRelationLockTTL = 1 * time.Minute
3029
)
3130

31+
// BizSetRelation is the biz set relation event flow struct
32+
type BizSetRelation struct {
33+
tasks []*task.Task
34+
}
35+
36+
// GetWatchTasks returns the event watch tasks
37+
func (b *BizSetRelation) GetWatchTasks() []*task.Task {
38+
return b.tasks
39+
}
40+
3241
// NewBizSetRelation init and run biz set relation event watch
33-
func NewBizSetRelation(task *task.Task) error {
42+
func NewBizSetRelation() (*BizSetRelation, error) {
43+
bizSetRel := &BizSetRelation{tasks: make([]*task.Task, 0)}
44+
3445
base := mixevent.MixEventFlowOptions{
3546
MixKey: event.BizSetRelationKey,
36-
Task: task,
3747
EventLockKey: bizSetRelationLockKey,
3848
EventLockTTL: bizSetRelationLockTTL,
3949
}
@@ -42,20 +52,20 @@ func NewBizSetRelation(task *task.Task) error {
4252
bizSet := base
4353
bizSet.Key = event.BizSetKey
4454
bizSet.WatchFields = []string{common.BKBizSetIDField, common.BKBizSetScopeField}
45-
if err := newBizSetRelation(context.Background(), bizSet); err != nil {
55+
if err := bizSetRel.addWatchTask(bizSet); err != nil {
4656
blog.Errorf("watch biz set event for biz set relation failed, err: %v", err)
47-
return err
57+
return nil, err
4858
}
4959
blog.Info("watch biz set relation events, watch biz set success")
5060

5161
// watch biz event
5262
biz := base
5363
biz.Key = event.BizKey
54-
if err := newBizSetRelation(context.Background(), biz); err != nil {
64+
if err := bizSetRel.addWatchTask(biz); err != nil {
5565
blog.Errorf("watch biz event for biz set relation failed, err: %v", err)
56-
return err
66+
return nil, err
5767
}
5868
blog.Info("watch biz set relation events, watch biz success")
5969

60-
return nil
70+
return bizSetRel, nil
6171
}

src/source_controller/cacheservice/event/bsrelation/event.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333
"configcenter/src/storage/stream/types"
3434
)
3535

36-
// newBizSetRelation init and run biz set relation event watch with sub event key
37-
func newBizSetRelation(ctx context.Context, opts mixevent.MixEventFlowOptions) error {
36+
// addWatchTask add biz set relation event watch task with sub event key
37+
func (b *BizSetRelation) addWatchTask(opts mixevent.MixEventFlowOptions) error {
3838
relation := bizSetRelation{
3939
mixKey: opts.MixKey,
4040
key: opts.Key,
@@ -66,7 +66,13 @@ func newBizSetRelation(ctx context.Context, opts mixevent.MixEventFlowOptions) e
6666
return err
6767
}
6868

69-
return flow.RunFlow(ctx)
69+
flowTask, err := flow.GenWatchTask()
70+
if err != nil {
71+
return err
72+
}
73+
74+
b.tasks = append(b.tasks, flowTask)
75+
return nil
7076
}
7177

7278
// bizSetRelation biz set relation event watch logic struct

0 commit comments

Comments
 (0)