@@ -8,49 +8,73 @@ package fabric
88
99import (
1010 "sync"
11+ "sync/atomic"
1112
1213 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer"
1314 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events"
1415)
1516
1617// EventListener models the parameters to use for chaincode listening.
1718type EventListener struct {
18- sync.RWMutex
1919 chaincodeListener chan * committer.ChaincodeEvent
2020 subscriber events.Subscriber
2121 chaincodeName string
22- closing bool
22+
23+ isClosed atomic.Bool
24+ wg sync.WaitGroup
25+ shutdownOnce sync.Once
26+ shutdownSignal chan struct {}
27+ subscribeOnce sync.Once
2328}
2429
2530func newEventListener (subscriber events.Subscriber , chaincodeName string ) * EventListener {
2631 return & EventListener {
27- chaincodeName : chaincodeName ,
28- subscriber : subscriber ,
32+ chaincodeName : chaincodeName ,
33+ subscriber : subscriber ,
34+ shutdownSignal : make (chan struct {}),
2935 }
3036}
3137
3238// ChaincodeEvents returns a channel from which chaincode events emitted by transaction functions in the specified chaincode can be read.
3339func (e * EventListener ) ChaincodeEvents () chan * committer.ChaincodeEvent {
34- e .chaincodeListener = make (chan * committer.ChaincodeEvent , 1 )
35- e .subscriber .Subscribe (e .chaincodeName , e )
40+ e .subscribeOnce .Do (func () {
41+ e .chaincodeListener = make (chan * committer.ChaincodeEvent , 1 )
42+ e .subscriber .Subscribe (e .chaincodeName , e )
43+ })
44+
3645 return e .chaincodeListener
3746}
3847
3948// CloseChaincodeEvents closes the channel from which chaincode events are read.
4049func (e * EventListener ) CloseChaincodeEvents () {
41- e .Lock ()
42- e .closing = true
43- e .Unlock ()
50+ e .shutdownOnce .Do (func () {
51+ e .isClosed .Store (true )
52+
53+ // first unsubscribe
54+ e .subscriber .Unsubscribe (e .chaincodeName , e )
55+
56+ // tell every ongoing OnReceive to stop writing into chaincodeListener channel
57+ close (e .shutdownSignal )
4458
45- e .subscriber .Unsubscribe (e .chaincodeName , e )
46- close (e .chaincodeListener )
59+ // wait until all OnReceive calls have finished
60+ e .wg .Wait ()
61+
62+ // signal consumer that there is no more coming
63+ close (e .chaincodeListener )
64+ })
4765}
4866
4967// OnReceive pushes events to the listener
5068func (e * EventListener ) OnReceive (event events.Event ) {
51- e .RLock ()
52- defer e .RUnlock ()
53- if ! e .closing {
54- e .chaincodeListener <- event .Message ().(* committer.ChaincodeEvent )
69+ if event == nil || e .isClosed .Load () {
70+ return
71+ }
72+
73+ e .wg .Add (1 )
74+ defer e .wg .Done ()
75+
76+ select {
77+ case <- e .shutdownSignal :
78+ case e .chaincodeListener <- event .Message ().(* committer.ChaincodeEvent ):
5579 }
5680}
0 commit comments