-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathinformer_concurrent.go
More file actions
132 lines (113 loc) · 4.34 KB
/
informer_concurrent.go
File metadata and controls
132 lines (113 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package operator
import (
"context"
"sync"
"github.com/grafana/grafana-app-sdk/health"
)
var (
_ Informer = &ConcurrentInformer{}
_ health.Checker = &ConcurrentInformer{}
)
// ConcurrentInformer implements the Informer interface, wrapping another Informer implementation
// to provide concurrent handling of events.
// Events will still be emitted sequentially, but the event handler methods on added ResourceWatchers
// (ie the business logic) will be processed by concurrent workers. Events for an object will be assigned
// to the same worker to preserve the per-object in-order guarantee provided by K8s client tooling.
type ConcurrentInformer struct {
errorHandler func(context.Context, error)
informer Informer
watchers []*concurrentWatcher
maxConcurrentWorkers uint64
mtx sync.RWMutex
}
// ConcurrentInformerOptions are options for the ConcurrentInformer.
type ConcurrentInformerOptions struct {
// ErrorHandler is a user-specified error handling function. If left nil, DefaultErrorHandler will be used.
ErrorHandler func(context.Context, error)
// MaxConcurrentWorkers is a limit on the number of workers to run concurrently for each ResourceWatcher. Each
// worker maintains a queue of events which are processed sequentially. Events for a particular object are assigned
// to the same worker, as to maintain the guarantee of in-order delivery of events per object.
// By default, a single worker is run to process all events sequentially.
MaxConcurrentWorkers uint64
}
// NewConcurrentInformer creates a new ConcurrentInformer wrapping the provided Informer.
//
// Deprecated: Use NewConcurrentInformerFromOptions instead, which accepts InformerOptions.
func NewConcurrentInformer(inf Informer, opts ConcurrentInformerOptions) (
*ConcurrentInformer, error) {
ci := &ConcurrentInformer{
errorHandler: DefaultErrorHandler,
informer: inf,
watchers: make([]*concurrentWatcher, 0),
maxConcurrentWorkers: 10,
}
if opts.ErrorHandler != nil {
ci.errorHandler = opts.ErrorHandler
}
if opts.MaxConcurrentWorkers > 0 {
ci.maxConcurrentWorkers = opts.MaxConcurrentWorkers
}
return ci, nil
}
// NewConcurrentInformerFromOptions creates a new ConcurrentInformer wrapping the provided Informer,
// using InformerOptions for configuration.
func NewConcurrentInformerFromOptions(inf Informer, opts InformerOptions) (
*ConcurrentInformer, error) {
ci := &ConcurrentInformer{
errorHandler: DefaultErrorHandler,
informer: inf,
watchers: make([]*concurrentWatcher, 0),
maxConcurrentWorkers: 10,
}
if opts.ErrorHandler != nil {
ci.errorHandler = opts.ErrorHandler
}
if opts.MaxConcurrentWorkers > 0 {
ci.maxConcurrentWorkers = opts.MaxConcurrentWorkers
}
return ci, nil
}
// AddEventHandler adds a ResourceWatcher as an event handler for watch events from the informer.
// The ResourceWatcher is wrapped before adding it to the underlying Informer, to allow concurrent
// handling of the events.
// Event handlers are not guaranteed to be executed in parallel or in any particular order by the underlying
// Informer. If you want to coordinate between ResourceWatchers, use an InformerController.
// nolint:dupl
func (ci *ConcurrentInformer) AddEventHandler(handler ResourceWatcher) error {
cw, err := newConcurrentWatcher(handler, ci.maxConcurrentWorkers, ci.errorHandler)
if err != nil {
return err
}
{
ci.mtx.Lock()
ci.watchers = append(ci.watchers, cw)
ci.mtx.Unlock()
}
return ci.informer.AddEventHandler(cw)
}
// Run starts the informer and blocks until stopCh receives a message
func (ci *ConcurrentInformer) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ci.mtx.RLock()
for _, cw := range ci.watchers {
go cw.Run(ctx)
}
ci.mtx.RUnlock()
return ci.informer.Run(ctx)
}
func (ci *ConcurrentInformer) HealthChecks() []health.Check {
checks := make([]health.Check, 0)
if cast, ok := ci.informer.(health.Check); ok {
checks = append(checks, cast)
}
if cast, ok := ci.informer.(health.Checker); ok {
checks = append(checks, cast.HealthChecks()...)
}
return checks
}
// WaitForSync waits for the informer to sync.
// If the sync is not complete within the context deadline, it will return a timeout error.
func (ci *ConcurrentInformer) WaitForSync(ctx context.Context) error {
return ci.informer.WaitForSync(ctx)
}