Skip to content

Commit bf294d4

Browse files
fixup! Fix deadlock with ccevents
Signed-off-by: Marcus Brandenburger <bur@zurich.ibm.com>
1 parent f304a3e commit bf294d4

File tree

1 file changed

+26
-19
lines changed

1 file changed

+26
-19
lines changed

platform/fabric/events.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,41 @@ import (
1515

1616
// EventListener models the parameters to use for chaincode listening.
1717
type EventListener struct {
18-
chaincodeListener chan *committer.ChaincodeEvent
19-
subscriber events.Subscriber
20-
chaincodeName string
18+
eventCh chan *committer.ChaincodeEvent // this is our main event channel
19+
subscriber events.Subscriber
20+
chaincodeName string
2121

2222
subscribeOnce sync.Once
23-
24-
middleCh chan *committer.ChaincodeEvent
25-
closing chan struct{}
26-
closed chan struct{}
23+
middleCh chan *committer.ChaincodeEvent
24+
closing chan struct{}
25+
closed chan struct{}
2726
}
2827

2928
func newEventListener(subscriber events.Subscriber, chaincodeName string) *EventListener {
3029
return &EventListener{
31-
chaincodeName: chaincodeName,
32-
subscriber: subscriber,
33-
chaincodeListener: make(chan *committer.ChaincodeEvent),
34-
middleCh: make(chan *committer.ChaincodeEvent),
35-
closing: make(chan struct{}),
36-
closed: make(chan struct{}),
30+
chaincodeName: chaincodeName,
31+
subscriber: subscriber,
32+
eventCh: make(chan *committer.ChaincodeEvent),
33+
middleCh: make(chan *committer.ChaincodeEvent),
34+
closing: make(chan struct{}),
35+
closed: make(chan struct{}),
3736
}
3837
}
3938

4039
// ChaincodeEvents returns a channel from which chaincode events emitted by transaction functions in the specified chaincode can be read.
4140
func (e *EventListener) ChaincodeEvents() <-chan *committer.ChaincodeEvent {
4241
e.subscribeOnce.Do(func() {
43-
42+
// when a consumer first time calls ChaincodeEvents, we set up the event subscription for the chaincode name
43+
// and start this goroutine for graceful closing
4444
go func() {
45+
// our shutdown helper function
4546
exit := func(v *committer.ChaincodeEvent, needSend bool) {
4647
close(e.closed)
48+
e.subscriber.Unsubscribe(e.chaincodeName, e)
4749
if needSend {
48-
e.chaincodeListener <- v
50+
e.eventCh <- v
4951
}
50-
close(e.chaincodeListener)
52+
close(e.eventCh)
5153
}
5254

5355
for {
@@ -56,27 +58,30 @@ func (e *EventListener) ChaincodeEvents() <-chan *committer.ChaincodeEvent {
5658
exit(nil, false)
5759
return
5860
case v := <-e.middleCh:
61+
// we have a new event v received via OnReceive
5962
select {
6063
case <-e.closing:
6164
exit(v, true)
6265
return
63-
case e.chaincodeListener <- v:
66+
case e.eventCh <- v:
67+
// forward event v to event channel
6468
}
6569
}
6670
}
6771
}()
6872

73+
// finally we create our subscription
6974
e.subscriber.Subscribe(e.chaincodeName, e)
7075
})
7176

72-
return e.chaincodeListener
77+
// we always return the event channel; if it is closed
78+
return e.eventCh
7379
}
7480

7581
// CloseChaincodeEvents closes the channel from which chaincode events are read.
7682
func (e *EventListener) CloseChaincodeEvents() {
7783
select {
7884
case e.closing <- struct{}{}:
79-
e.subscriber.Unsubscribe(e.chaincodeName, e)
8085
<-e.closed
8186
case <-e.closed:
8287
}
@@ -88,6 +93,8 @@ func (e *EventListener) OnReceive(event events.Event) {
8893
return
8994
}
9095

96+
// we check if our event listener is already closed
97+
// we do this extra select to prioritize the close channel (https://go.dev/ref/spec#Select_statements).
9198
select {
9299
case <-e.closed:
93100
return

0 commit comments

Comments
 (0)