-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathemitter.go
380 lines (330 loc) · 8.38 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
/*
Package emitter implements channel based pubsub pattern.
The design goals are:
- fully functional and safety
- simple to understand and use
- make the code readable, maintainable and minimalistic
*/
package emitter
import (
"path"
"sync"
)
// Flag used to describe what behavior
// do you expect.
type Flag int
const (
// FlagReset only to clear previously defined flags.
// Example:
// ee.Use("*", Reset) // clears flags for this pattern
FlagReset Flag = 0
// FlagOnce indicates to remove the listener after first sending.
FlagOnce Flag = 1 << iota
// FlagVoid indicates to skip sending.
FlagVoid
// FlagSkip indicates to skip sending if channel is blocked.
FlagSkip
// FlagClose indicates to drop listener if channel is blocked.
FlagClose
// FlagSync indicates to send an event synchronously.
FlagSync
)
// Middlewares.
// Reset middleware resets flags
func Reset(e *Event) { e.Flags = FlagReset }
// Once middleware sets FlagOnce flag for an event
func Once(e *Event) { e.Flags = e.Flags | FlagOnce }
// Void middleware sets FlagVoid flag for an event
func Void(e *Event) { e.Flags = e.Flags | FlagVoid }
// Skip middleware sets FlagSkip flag for an event
func Skip(e *Event) { e.Flags = e.Flags | FlagSkip }
// Close middleware sets FlagClose flag for an event
func Close(e *Event) { e.Flags = e.Flags | FlagClose }
// Sync middleware sets FlagSync flag for an event
func Sync(e *Event) { e.Flags = e.Flags | FlagSync }
// New returns just created Emitter struct. Capacity argument
// will be used to create channels with given capacity by default. The
// OnWithCap method can be used to get different capacities per listener.
func New(capacity uint) *Emitter {
return &Emitter{
Cap: capacity,
listeners: make(map[string][]listener),
middlewares: make(map[string][]func(*Event)),
isInit: true,
}
}
// Emitter is a struct that allows to emit, receive
// event, close receiver channel, get info
// about topics and listeners
type Emitter struct {
Cap uint
mu sync.Mutex
listeners map[string][]listener
isInit bool
middlewares map[string][]func(*Event)
}
func newListener(capacity uint, middlewares ...func(*Event)) listener {
return listener{
ch: make(chan Event, capacity),
middlewares: middlewares,
}
}
type listener struct {
ch chan Event
middlewares []func(*Event)
}
func (e *Emitter) init() {
if !e.isInit {
e.listeners = make(map[string][]listener)
e.middlewares = make(map[string][]func(*Event))
e.isInit = true
}
}
// Use registers middlewares for the pattern.
func (e *Emitter) Use(pattern string, middlewares ...func(*Event)) {
e.mu.Lock()
e.init()
defer e.mu.Unlock()
e.middlewares[pattern] = middlewares
if len(e.middlewares[pattern]) == 0 {
delete(e.middlewares, pattern)
}
}
// On returns a channel that will receive events. As optional second
// argument it takes middlewares.
func (e *Emitter) On(topic string, middlewares ...func(*Event)) <-chan Event {
return e.OnWithCap(topic, e.Cap, middlewares...)
}
// On returns a channel that will receive events with the listener capacity
// specified. As optional second argument it takes middlewares.
func (e *Emitter) OnWithCap(topic string, capacity uint, middlewares ...func(*Event)) <-chan Event {
e.mu.Lock()
e.init()
l := newListener(capacity, middlewares...)
if listeners, ok := e.listeners[topic]; ok {
e.listeners[topic] = append(listeners, l)
} else {
e.listeners[topic] = []listener{l}
}
e.mu.Unlock()
return l.ch
}
// Once works exactly like On(see above) but with `Once` as the first middleware.
func (e *Emitter) Once(topic string, middlewares ...func(*Event)) <-chan Event {
return e.On(topic, append(middlewares, Once)...)
}
// Off unsubscribes all listeners which were covered by
// topic, it can be pattern as well.
func (e *Emitter) Off(topic string, channels ...<-chan Event) {
e.mu.Lock()
defer e.mu.Unlock()
e.init()
match, _ := e.matched(topic)
for _, _topic := range match {
if listeners, ok := e.listeners[_topic]; ok {
if len(channels) == 0 {
for i := len(listeners) - 1; i >= 0; i-- {
close(listeners[i].ch)
listeners = drop(listeners, i)
}
} else {
for chi := range channels {
curr := channels[chi]
for i := len(listeners) - 1; i >= 0; i-- {
if curr == listeners[i].ch {
close(listeners[i].ch)
listeners = drop(listeners, i)
}
}
}
}
e.listeners[_topic] = listeners
}
if len(e.listeners[_topic]) == 0 {
delete(e.listeners, _topic)
}
}
}
// Listeners returns slice of listeners which were covered by
// topic(it can be pattern) and error if pattern is invalid.
func (e *Emitter) Listeners(topic string) []<-chan Event {
e.mu.Lock()
e.init()
defer e.mu.Unlock()
var acc []<-chan Event
match, _ := e.matched(topic)
for _, _topic := range match {
list := e.listeners[_topic]
for i := range e.listeners[_topic] {
acc = append(acc, list[i].ch)
}
}
return acc
}
// Topics returns all existing topics.
func (e *Emitter) Topics() []string {
e.mu.Lock()
e.init()
defer e.mu.Unlock()
acc := make([]string, len(e.listeners))
i := 0
for k := range e.listeners {
acc[i] = k
i++
}
return acc
}
// Emit emits an event with the rest arguments to all
// listeners which were covered by topic(it can be pattern).
func (e *Emitter) Emit(topic string, args ...interface{}) chan struct{} {
e.mu.Lock()
e.init()
done := make(chan struct{}, 1)
match, _ := e.matched(topic)
var wg sync.WaitGroup
var haveToWait bool
for _, _topic := range match {
listeners := e.listeners[_topic]
event := Event{
Topic: _topic,
OriginalTopic: topic,
Args: args,
}
applyMiddlewares(&event, e.getMiddlewares(_topic))
// whole topic is skipping
// if (event.Flags | FlagVoid) == event.Flags {
// continue
// }
Loop:
for i := len(listeners) - 1; i >= 0; i-- {
lstnr := listeners[i]
evn := *(&event) // copy the event
applyMiddlewares(&evn, lstnr.middlewares)
if (evn.Flags | FlagVoid) == evn.Flags {
continue Loop
}
if (evn.Flags | FlagSync) == evn.Flags {
_, remove, _ := pushEvent(done, lstnr.ch, &evn)
if remove {
defer e.Off(event.Topic, lstnr.ch)
}
} else {
wg.Add(1)
haveToWait = true
go func(lstnr listener, event *Event) {
e.mu.Lock()
_, remove, _ := pushEvent(done, lstnr.ch, event)
if remove {
defer e.Off(event.Topic, lstnr.ch)
}
wg.Done()
e.mu.Unlock()
}(lstnr, &evn)
}
}
}
if haveToWait {
go func(done chan struct{}) {
defer func() { recover() }()
wg.Wait()
close(done)
}(done)
} else {
close(done)
}
e.mu.Unlock()
return done
}
func pushEvent(
done chan struct{},
lstnr chan Event,
event *Event,
) (success, remove bool, err error) {
// unwind the flags
isOnce := (event.Flags | FlagOnce) == event.Flags
isSkip := (event.Flags | FlagSkip) == event.Flags
isClose := (event.Flags | FlagClose) == event.Flags
sent, canceled := send(
done,
lstnr,
*event,
!(isSkip || isClose),
)
success = sent
if !sent && !canceled {
remove = isClose
// if not sent
} else if !canceled {
// if event was sent successfully
remove = isOnce
}
return
}
func (e *Emitter) getMiddlewares(topic string) []func(*Event) {
var acc []func(*Event)
for pattern, v := range e.middlewares {
if match, _ := path.Match(pattern, topic); match {
acc = append(acc, v...)
} else if match, _ := path.Match(topic, pattern); match {
acc = append(acc, v...)
}
}
return acc
}
func applyMiddlewares(e *Event, fns []func(*Event)) {
for i := range fns {
fns[i](e)
}
}
func (e *Emitter) matched(topic string) ([]string, error) {
acc := []string{}
var err error
for k := range e.listeners {
if matched, err := path.Match(topic, k); err != nil {
return []string{}, err
} else if matched {
acc = append(acc, k)
} else {
if matched, _ := path.Match(k, topic); matched {
acc = append(acc, k)
}
}
}
return acc, err
}
func drop(l []listener, i int) []listener {
return append(l[:i], l[i+1:]...)
}
func send(
done chan struct{},
ch chan Event,
e Event, wait bool,
) (sent, canceled bool) {
defer func() {
if r := recover(); r != nil {
canceled = false
sent = false
}
}()
if !wait {
select {
case <-done:
break
case ch <- e:
sent = true
return
default:
return
}
} else {
select {
case <-done:
break
case ch <- e:
sent = true
return
}
}
canceled = true
return
}