Skip to content

Commit a9e57bf

Browse files
Fix deadlock with ccevents
- Revise fabric.EventListener - Add additional tests Signed-off-by: Marcus Brandenburger <bur@zurich.ibm.com>
1 parent 827b334 commit a9e57bf

File tree

2 files changed

+249
-36
lines changed

2 files changed

+249
-36
lines changed

platform/fabric/events.go

Lines changed: 102 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,50 +7,131 @@ SPDX-License-Identifier: Apache-2.0
77
package fabric
88

99
import (
10+
"context"
1011
"sync"
12+
"time"
1113

1214
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer"
1315
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events"
1416
)
1517

16-
// EventListener models the parameters to use for chaincode listening.
18+
const (
19+
defaultBufferLen = 10
20+
defaultRecvTimeout = 10 * time.Millisecond
21+
)
22+
23+
// EventListener implements the event.Listener interface and provides
24+
// a best-effort mechanism for receiving chaincode events.
25+
//
26+
// Internally, it maintains a buffered channel of size `bufferLen` for pending events.
27+
// When the buffer is full, new events are retained for up to `recvTimeout` to allow
28+
// consumers a chance to catch up. If the buffer remains full after this timeout,
29+
// the pending event is dropped to avoid blocking the producer.
30+
//
31+
// This implementation prioritizes system responsiveness over guaranteed delivery;
32+
// consumers should tolerate occasional event loss.
1733
type EventListener struct {
18-
sync.RWMutex
19-
chaincodeListener chan *committer.ChaincodeEvent
20-
subscriber events.Subscriber
21-
chaincodeName string
22-
closing bool
34+
eventCh chan *committer.ChaincodeEvent // this is our main event channel
35+
subscriber events.Subscriber
36+
chaincodeName string
37+
38+
subscribeOnce sync.Once
39+
middleCh chan *committer.ChaincodeEvent
40+
closing chan struct{}
41+
closed chan struct{}
42+
43+
recvTimeout time.Duration
2344
}
2445

46+
// newEventListener create a `EventListener` with `defaultBufferLen` and `defaultRecvTimeout`.
2547
func newEventListener(subscriber events.Subscriber, chaincodeName string) *EventListener {
2648
return &EventListener{
2749
chaincodeName: chaincodeName,
2850
subscriber: subscriber,
51+
eventCh: make(chan *committer.ChaincodeEvent, defaultBufferLen),
52+
middleCh: make(chan *committer.ChaincodeEvent),
53+
closing: make(chan struct{}),
54+
closed: make(chan struct{}),
55+
recvTimeout: defaultRecvTimeout,
2956
}
3057
}
3158

3259
// ChaincodeEvents returns a channel from which chaincode events emitted by transaction functions in the specified chaincode can be read.
33-
func (e *EventListener) ChaincodeEvents() chan *committer.ChaincodeEvent {
34-
e.chaincodeListener = make(chan *committer.ChaincodeEvent, 1)
35-
e.subscriber.Subscribe(e.chaincodeName, e)
36-
return e.chaincodeListener
60+
func (e *EventListener) ChaincodeEvents() <-chan *committer.ChaincodeEvent {
61+
e.subscribeOnce.Do(func() {
62+
// when a consumer first time calls ChaincodeEvents, we set up the event subscription for the chaincode name
63+
// and start this goroutine for graceful closing
64+
go func() {
65+
// our shutdown helper function
66+
exit := func(v *committer.ChaincodeEvent, needSend bool) {
67+
close(e.closed)
68+
e.subscriber.Unsubscribe(e.chaincodeName, e)
69+
if needSend {
70+
e.eventCh <- v
71+
}
72+
close(e.eventCh)
73+
}
74+
75+
for {
76+
select {
77+
case <-e.closing:
78+
exit(nil, false)
79+
return
80+
case v := <-e.middleCh:
81+
// we have a new event v received via OnReceive
82+
select {
83+
case <-e.closing:
84+
exit(v, true)
85+
return
86+
case e.eventCh <- v:
87+
// forward event v to event channel
88+
}
89+
}
90+
}
91+
}()
92+
93+
// finally we create our subscription
94+
e.subscriber.Subscribe(e.chaincodeName, e)
95+
})
96+
97+
// we always return the event channel; if it is closed
98+
return e.eventCh
3799
}
38100

39101
// CloseChaincodeEvents closes the channel from which chaincode events are read.
40102
func (e *EventListener) CloseChaincodeEvents() {
41-
e.Lock()
42-
e.closing = true
43-
e.Unlock()
44-
45-
e.subscriber.Unsubscribe(e.chaincodeName, e)
46-
close(e.chaincodeListener)
103+
select {
104+
case e.closing <- struct{}{}:
105+
<-e.closed
106+
case <-e.closed:
107+
}
47108
}
48109

49-
// OnReceive pushes events to the listener
110+
// OnReceive pushes events to the listener.
111+
// The event is dropped, if it cannot be delivered to the event channel before the recvTimeout is fired.
50112
func (e *EventListener) OnReceive(event events.Event) {
51-
e.RLock()
52-
defer e.RUnlock()
53-
if !e.closing {
54-
e.chaincodeListener <- event.Message().(*committer.ChaincodeEvent)
113+
if event == nil {
114+
return
115+
}
116+
117+
ctx, cancel := context.WithTimeout(context.Background(), e.recvTimeout)
118+
defer cancel()
119+
120+
// we check if our event listener is already closed
121+
// we do this extra select to prioritize the close channel (https://go.dev/ref/spec#Select_statements).
122+
select {
123+
case <-e.closed:
124+
return
125+
default:
126+
}
127+
128+
select {
129+
case <-e.closed:
130+
return
131+
case <-ctx.Done():
132+
// if the event cannot send to the middleCh before the recvTimeout is fired,
133+
// we return to not further block the event notifier
134+
return
135+
case e.middleCh <- event.Message().(*committer.ChaincodeEvent):
55136
}
56137
}

platform/fabric/events_test.go

Lines changed: 147 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,22 @@ package fabric
88

99
import (
1010
"context"
11+
"runtime"
1112
"sync"
1213
"testing"
1314
"time"
1415

1516
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer"
1617
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events"
1718
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/require"
1820
)
1921

2022
const (
2123
publicationSnooze = time.Millisecond
2224
timeout = 75 * time.Millisecond
23-
waitFor = 4 * timeout
25+
longTimeout = 1 * time.Minute
26+
waitFor = 1 * timeout
2427
tick = timeout / 10
2528
)
2629

@@ -43,9 +46,11 @@ func (m *mockSubscriber) Unsubscribe(chaincodeName string, listener events.Liste
4346

4447
func (m *mockSubscriber) Publish(chaincodeName string, event *committer.ChaincodeEvent) {
4548
m.m.RLock()
46-
defer m.m.RUnlock()
47-
if m.listener != nil {
48-
m.listener.OnReceive(event)
49+
l := m.listener
50+
m.m.RUnlock()
51+
52+
if l != nil {
53+
l.OnReceive(event)
4954
}
5055
}
5156

@@ -54,30 +59,32 @@ func TestEventListener(t *testing.T) {
5459
listener := newEventListener(subscriber, "testChaincode")
5560
ch := listener.ChaincodeEvents()
5661

57-
done := make(chan bool)
62+
msg := &committer.ChaincodeEvent{Payload: []byte("some msg")}
63+
stopPublisher := make(chan bool)
5864

5965
var wg sync.WaitGroup
60-
wg.Add(2)
6166

6267
// Publish events
68+
wg.Add(1)
6369
go func() {
6470
defer wg.Done()
65-
msg := []byte("some msg")
6671
for {
6772
select {
68-
case <-done:
73+
case <-stopPublisher:
6974
return
7075
default:
71-
subscriber.Publish("testChaincode", &committer.ChaincodeEvent{Payload: msg})
76+
subscriber.Publish("testChaincode", msg)
7277
time.Sleep(publicationSnooze)
7378
}
7479
}
7580
}()
7681

77-
// Stop while the publisher is still publishing
78-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
82+
// Stop the consumer and close the event listener while the producer is still publishing
83+
ctx, cancel := context.WithTimeout(context.Background(), waitFor)
7984
defer cancel()
8085

86+
// Consumer
87+
wg.Add(1)
8188
go func() {
8289
defer wg.Done()
8390
for {
@@ -94,13 +101,138 @@ func TestEventListener(t *testing.T) {
94101
}
95102
}()
96103

97-
assert.Eventually(t, func() bool {
98-
// eventually our event should be closed
104+
// let's wait until our timeout is fired
105+
<-ctx.Done()
106+
107+
// consume everything that is remaining in ch and eventually the channel should be closed
108+
require.Eventually(t, func() bool {
99109
_, ok := <-ch
100110
return !ok
101111
}, waitFor, tick)
102112

103-
// now we let our consumer know that they can stop working
104-
close(done)
113+
// now we let our publisher know that they can stop working
114+
close(stopPublisher)
105115
wg.Wait()
116+
117+
// check that our channel is closed
118+
require.Eventually(t, func() bool {
119+
return isClosed(ch)
120+
}, timeout, tick)
121+
}
122+
123+
func TestEventServiceMultipleClose(t *testing.T) {
124+
subscriber := &mockSubscriber{}
125+
listener := newEventListener(subscriber, "testChaincode")
126+
ch := listener.ChaincodeEvents()
127+
msg1 := &committer.ChaincodeEvent{Payload: []byte("msg1")}
128+
129+
var wg sync.WaitGroup
130+
wg.Add(1)
131+
go func() {
132+
wg.Done()
133+
subscriber.Publish("testChaincode", msg1)
134+
listener.CloseChaincodeEvents()
135+
}()
136+
137+
// Call Close multiple times safely
138+
listener.CloseChaincodeEvents()
139+
listener.CloseChaincodeEvents()
140+
listener.CloseChaincodeEvents()
141+
142+
wg.Wait()
143+
144+
// check that our channel is closed
145+
require.Eventually(t, func() bool {
146+
return isClosed(ch)
147+
}, timeout, tick)
148+
}
149+
150+
func TestEventListenerDeadlock(t *testing.T) {
151+
subscriber := &mockSubscriber{}
152+
153+
const customBufferLen = 1
154+
155+
// in this test we configure our event listener with a smaller buffer and long recvTimeout
156+
listener := &EventListener{
157+
chaincodeName: "testChaincode",
158+
subscriber: subscriber,
159+
eventCh: make(chan *committer.ChaincodeEvent, customBufferLen),
160+
middleCh: make(chan *committer.ChaincodeEvent),
161+
closing: make(chan struct{}),
162+
closed: make(chan struct{}),
163+
recvTimeout: longTimeout,
164+
}
165+
166+
ch := listener.ChaincodeEvents()
167+
168+
msg1 := &committer.ChaincodeEvent{Payload: []byte("msg1")}
169+
msg2 := &committer.ChaincodeEvent{Payload: []byte("msg2")}
170+
171+
// we publish and then consume
172+
subscriber.Publish("testChaincode", msg1)
173+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
174+
require.Len(ct, ch, 1)
175+
require.Equal(ct, msg1, <-ch)
176+
require.Len(ct, ch, 0)
177+
}, timeout, tick)
178+
179+
// next up, we fill our event buffer by publishing msg1
180+
for range customBufferLen {
181+
subscriber.Publish("testChaincode", msg1)
182+
}
183+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
184+
// out channel should be full now
185+
require.Len(ct, ch, customBufferLen)
186+
}, timeout, tick)
187+
188+
require.Never(t, func() bool {
189+
// this should be blocking (until longTimeout is fired)
190+
subscriber.Publish("testChaincode", msg1)
191+
return false
192+
}, timeout, tick)
193+
194+
// we kick off our producer to publish msg2
195+
var wg sync.WaitGroup
196+
wg.Add(1)
197+
go func() {
198+
defer wg.Done()
199+
// as msg1 is not yet consumed, our producer is blocked
200+
subscriber.Publish("testChaincode", msg2)
201+
}()
202+
203+
// let's give the producer a bit time
204+
runtime.Gosched()
205+
time.Sleep(waitFor)
206+
207+
// let's make sure that our producer is still waiting to complete publish msg2
208+
require.Never(t, func() bool {
209+
// we expect to be blocked
210+
wg.Wait()
211+
return false
212+
}, timeout, tick)
213+
214+
// now, we close the listener, which should unblock the producer
215+
listener.CloseChaincodeEvents()
216+
217+
// wait for the producer to finish
218+
wg.Wait()
219+
220+
// we expect msg1 to be successfully published
221+
require.EventuallyWithT(t, func(c *assert.CollectT) {
222+
require.Equal(c, msg1, <-ch)
223+
}, timeout, tick)
224+
225+
// check that our channel is closed
226+
require.Eventually(t, func() bool {
227+
return isClosed(ch)
228+
}, timeout, tick)
229+
}
230+
231+
func isClosed(ch <-chan *committer.ChaincodeEvent) bool {
232+
select {
233+
case <-ch:
234+
return true
235+
default:
236+
}
237+
return false
106238
}

0 commit comments

Comments
 (0)