-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathgroup.go
171 lines (145 loc) · 2.95 KB
/
group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package emitter
import (
"reflect"
"sync"
)
// Group marges given subscribed channels into
// on subscribed channel
type Group struct {
// Cap is capacity to create new channel
Cap uint
mu sync.Mutex
listeners []listener
isInit bool
stop chan struct{}
done chan struct{}
cmu sync.Mutex
cases []reflect.SelectCase
lmu sync.Mutex
isListen bool
}
// Flush reset the group to the initial state.
// All references will dropped.
func (g *Group) Flush() {
g.mu.Lock()
defer g.mu.Unlock()
g.stopIfListen()
close(g.stop)
close(g.done)
g.isInit = false
g.init()
}
// Add adds channels which were already subscribed to
// some events.
func (g *Group) Add(channels ...<-chan Event) {
g.mu.Lock()
defer g.listen()
defer g.mu.Unlock()
g.init()
g.stopIfListen()
g.cmu.Lock()
cases := make([]reflect.SelectCase, len(channels))
for i, ch := range channels {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
g.cases = append(g.cases, cases...)
g.cmu.Unlock()
}
// On returns subscribed channel.
func (g *Group) On() <-chan Event {
g.mu.Lock()
defer g.listen()
defer g.mu.Unlock()
g.init()
g.stopIfListen()
l := newListener(g.Cap)
g.listeners = append(g.listeners, l)
return l.ch
}
// Off unsubscribed given channels if any or unsubscribed all
// channels in other case
func (g *Group) Off(channels ...<-chan Event) {
g.mu.Lock()
defer g.listen()
defer g.mu.Unlock()
g.init()
g.stopIfListen()
if len(channels) != 0 {
for _, ch := range channels {
i := -1
Listeners:
for in := range g.listeners {
if g.listeners[in].ch == ch {
i = in
break Listeners
}
}
if i != -1 {
l := g.listeners[i]
g.listeners = append(g.listeners[:i], g.listeners[i+1:]...)
close(l.ch)
}
}
} else {
g.listeners = make([]listener, 0)
}
}
func (g *Group) stopIfListen() bool {
g.lmu.Lock()
defer g.lmu.Unlock()
if !g.isListen {
return false
}
g.stop <- struct{}{}
g.isListen = false
return true
}
func (g *Group) listen() {
g.lmu.Lock()
defer g.lmu.Unlock()
g.cmu.Lock()
g.isListen = true
go func() {
// unlock cases and isListen flag when func is exit
defer g.cmu.Unlock()
for {
i, val, isOpened := reflect.Select(g.cases)
// exit if listening is stopped
if i == 0 {
return
}
if !isOpened && len(g.cases) > i {
// remove this case
g.cases = append(g.cases[:i], g.cases[i+1:]...)
}
e := val.Interface().(Event)
// use unblocked mode
e.Flags = e.Flags | FlagSkip
// send events to all listeners
g.mu.Lock()
for index := range g.listeners {
l := g.listeners[index]
pushEvent(g.done, l.ch, &e)
}
g.mu.Unlock()
}
}()
}
func (g *Group) init() {
if g.isInit {
return
}
g.stop = make(chan struct{})
g.done = make(chan struct{})
g.cases = []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(g.stop),
},
}
g.listeners = make([]listener, 0)
g.isInit = true
}