From 827b334a19d05e4837a35faf49d0cc64348fa06d Mon Sep 17 00:00:00 2001 From: Marcus Brandenburger Date: Mon, 6 Oct 2025 17:54:02 +0200 Subject: [PATCH 1/2] Use a single network for cc event tests Signed-off-by: Marcus Brandenburger --- integration/fabric/events/events_test.go | 9 ++++++--- integration/fabric/events/views/events.go | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/integration/fabric/events/events_test.go b/integration/fabric/events/events_test.go index 7d547d5b4..fc49a85b9 100644 --- a/integration/fabric/events/events_test.go +++ b/integration/fabric/events/events_test.go @@ -20,10 +20,13 @@ import ( ) var _ = Describe("EndToEnd", func() { - Describe("Events (With Chaincode) With Websockets", func() { + Describe("Events (With Chaincode) With Websockets", Ordered, func() { s := NewTestSuite(fsc.WebSocket, integration.NoReplication) - BeforeEach(s.Setup) - AfterEach(s.TearDown) + + // we run all tests sequentially on the same network instance + BeforeAll(s.Setup) + AfterAll(s.TearDown) + It("clients listening to single chaincode events", s.TestSingleChaincodeEvents) It("client listening to multiple chaincode events", s.TestMultipleChaincodeEvents) It("multiple clients unsubscribing", s.TestMultipleListenersAndUnsubscribe) diff --git a/integration/fabric/events/views/events.go b/integration/fabric/events/views/events.go index febd4b987..a200292dd 100644 --- a/integration/fabric/events/views/events.go +++ b/integration/fabric/events/views/events.go @@ -73,7 +73,7 @@ func (c *EventsView) Call(context view.Context) (interface{}, error) { wg.Add(1) eventReceived = nil eventError = nil - ctx1, cancelFunc1 := context2.WithTimeout(context.Context(), 10*time.Minute) + ctx1, cancelFunc1 := context2.WithTimeout(context.Context(), 1*time.Minute) defer cancelFunc1() _, err = context.RunView(chaincode.NewListenToEventsViewWithContext(ctx1, "events", callBack)) assert.NoError(err, "failed to listen to events") From a9e57bf304a9c2a21aab882b4dbdba004fb78228 Mon Sep 17 00:00:00 2001 From: Marcus Brandenburger Date: Mon, 6 Oct 2025 17:54:42 +0200 Subject: [PATCH 2/2] Fix deadlock with ccevents - Revise fabric.EventListener - Add additional tests Signed-off-by: Marcus Brandenburger --- platform/fabric/events.go | 123 ++++++++++++++++++++----- platform/fabric/events_test.go | 162 ++++++++++++++++++++++++++++++--- 2 files changed, 249 insertions(+), 36 deletions(-) diff --git a/platform/fabric/events.go b/platform/fabric/events.go index cc8ac9127..53cc94457 100644 --- a/platform/fabric/events.go +++ b/platform/fabric/events.go @@ -7,50 +7,131 @@ SPDX-License-Identifier: Apache-2.0 package fabric import ( + "context" "sync" + "time" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events" ) -// EventListener models the parameters to use for chaincode listening. +const ( + defaultBufferLen = 10 + defaultRecvTimeout = 10 * time.Millisecond +) + +// EventListener implements the event.Listener interface and provides +// a best-effort mechanism for receiving chaincode events. +// +// Internally, it maintains a buffered channel of size `bufferLen` for pending events. +// When the buffer is full, new events are retained for up to `recvTimeout` to allow +// consumers a chance to catch up. If the buffer remains full after this timeout, +// the pending event is dropped to avoid blocking the producer. +// +// This implementation prioritizes system responsiveness over guaranteed delivery; +// consumers should tolerate occasional event loss. type EventListener struct { - sync.RWMutex - chaincodeListener chan *committer.ChaincodeEvent - subscriber events.Subscriber - chaincodeName string - closing bool + eventCh chan *committer.ChaincodeEvent // this is our main event channel + subscriber events.Subscriber + chaincodeName string + + subscribeOnce sync.Once + middleCh chan *committer.ChaincodeEvent + closing chan struct{} + closed chan struct{} + + recvTimeout time.Duration } +// newEventListener create a `EventListener` with `defaultBufferLen` and `defaultRecvTimeout`. func newEventListener(subscriber events.Subscriber, chaincodeName string) *EventListener { return &EventListener{ chaincodeName: chaincodeName, subscriber: subscriber, + eventCh: make(chan *committer.ChaincodeEvent, defaultBufferLen), + middleCh: make(chan *committer.ChaincodeEvent), + closing: make(chan struct{}), + closed: make(chan struct{}), + recvTimeout: defaultRecvTimeout, } } // ChaincodeEvents returns a channel from which chaincode events emitted by transaction functions in the specified chaincode can be read. -func (e *EventListener) ChaincodeEvents() chan *committer.ChaincodeEvent { - e.chaincodeListener = make(chan *committer.ChaincodeEvent, 1) - e.subscriber.Subscribe(e.chaincodeName, e) - return e.chaincodeListener +func (e *EventListener) ChaincodeEvents() <-chan *committer.ChaincodeEvent { + e.subscribeOnce.Do(func() { + // when a consumer first time calls ChaincodeEvents, we set up the event subscription for the chaincode name + // and start this goroutine for graceful closing + go func() { + // our shutdown helper function + exit := func(v *committer.ChaincodeEvent, needSend bool) { + close(e.closed) + e.subscriber.Unsubscribe(e.chaincodeName, e) + if needSend { + e.eventCh <- v + } + close(e.eventCh) + } + + for { + select { + case <-e.closing: + exit(nil, false) + return + case v := <-e.middleCh: + // we have a new event v received via OnReceive + select { + case <-e.closing: + exit(v, true) + return + case e.eventCh <- v: + // forward event v to event channel + } + } + } + }() + + // finally we create our subscription + e.subscriber.Subscribe(e.chaincodeName, e) + }) + + // we always return the event channel; if it is closed + return e.eventCh } // CloseChaincodeEvents closes the channel from which chaincode events are read. func (e *EventListener) CloseChaincodeEvents() { - e.Lock() - e.closing = true - e.Unlock() - - e.subscriber.Unsubscribe(e.chaincodeName, e) - close(e.chaincodeListener) + select { + case e.closing <- struct{}{}: + <-e.closed + case <-e.closed: + } } -// OnReceive pushes events to the listener +// OnReceive pushes events to the listener. +// The event is dropped, if it cannot be delivered to the event channel before the recvTimeout is fired. func (e *EventListener) OnReceive(event events.Event) { - e.RLock() - defer e.RUnlock() - if !e.closing { - e.chaincodeListener <- event.Message().(*committer.ChaincodeEvent) + if event == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), e.recvTimeout) + defer cancel() + + // we check if our event listener is already closed + // we do this extra select to prioritize the close channel (https://go.dev/ref/spec#Select_statements). + select { + case <-e.closed: + return + default: + } + + select { + case <-e.closed: + return + case <-ctx.Done(): + // if the event cannot send to the middleCh before the recvTimeout is fired, + // we return to not further block the event notifier + return + case e.middleCh <- event.Message().(*committer.ChaincodeEvent): } } diff --git a/platform/fabric/events_test.go b/platform/fabric/events_test.go index c4b801515..8fdaa56ea 100644 --- a/platform/fabric/events_test.go +++ b/platform/fabric/events_test.go @@ -8,6 +8,7 @@ package fabric import ( "context" + "runtime" "sync" "testing" "time" @@ -15,12 +16,14 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/committer" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/events" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( publicationSnooze = time.Millisecond timeout = 75 * time.Millisecond - waitFor = 4 * timeout + longTimeout = 1 * time.Minute + waitFor = 1 * timeout tick = timeout / 10 ) @@ -43,9 +46,11 @@ func (m *mockSubscriber) Unsubscribe(chaincodeName string, listener events.Liste func (m *mockSubscriber) Publish(chaincodeName string, event *committer.ChaincodeEvent) { m.m.RLock() - defer m.m.RUnlock() - if m.listener != nil { - m.listener.OnReceive(event) + l := m.listener + m.m.RUnlock() + + if l != nil { + l.OnReceive(event) } } @@ -54,30 +59,32 @@ func TestEventListener(t *testing.T) { listener := newEventListener(subscriber, "testChaincode") ch := listener.ChaincodeEvents() - done := make(chan bool) + msg := &committer.ChaincodeEvent{Payload: []byte("some msg")} + stopPublisher := make(chan bool) var wg sync.WaitGroup - wg.Add(2) // Publish events + wg.Add(1) go func() { defer wg.Done() - msg := []byte("some msg") for { select { - case <-done: + case <-stopPublisher: return default: - subscriber.Publish("testChaincode", &committer.ChaincodeEvent{Payload: msg}) + subscriber.Publish("testChaincode", msg) time.Sleep(publicationSnooze) } } }() - // Stop while the publisher is still publishing - ctx, cancel := context.WithTimeout(context.Background(), timeout) + // Stop the consumer and close the event listener while the producer is still publishing + ctx, cancel := context.WithTimeout(context.Background(), waitFor) defer cancel() + // Consumer + wg.Add(1) go func() { defer wg.Done() for { @@ -94,13 +101,138 @@ func TestEventListener(t *testing.T) { } }() - assert.Eventually(t, func() bool { - // eventually our event should be closed + // let's wait until our timeout is fired + <-ctx.Done() + + // consume everything that is remaining in ch and eventually the channel should be closed + require.Eventually(t, func() bool { _, ok := <-ch return !ok }, waitFor, tick) - // now we let our consumer know that they can stop working - close(done) + // now we let our publisher know that they can stop working + close(stopPublisher) wg.Wait() + + // check that our channel is closed + require.Eventually(t, func() bool { + return isClosed(ch) + }, timeout, tick) +} + +func TestEventServiceMultipleClose(t *testing.T) { + subscriber := &mockSubscriber{} + listener := newEventListener(subscriber, "testChaincode") + ch := listener.ChaincodeEvents() + msg1 := &committer.ChaincodeEvent{Payload: []byte("msg1")} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Done() + subscriber.Publish("testChaincode", msg1) + listener.CloseChaincodeEvents() + }() + + // Call Close multiple times safely + listener.CloseChaincodeEvents() + listener.CloseChaincodeEvents() + listener.CloseChaincodeEvents() + + wg.Wait() + + // check that our channel is closed + require.Eventually(t, func() bool { + return isClosed(ch) + }, timeout, tick) +} + +func TestEventListenerDeadlock(t *testing.T) { + subscriber := &mockSubscriber{} + + const customBufferLen = 1 + + // in this test we configure our event listener with a smaller buffer and long recvTimeout + listener := &EventListener{ + chaincodeName: "testChaincode", + subscriber: subscriber, + eventCh: make(chan *committer.ChaincodeEvent, customBufferLen), + middleCh: make(chan *committer.ChaincodeEvent), + closing: make(chan struct{}), + closed: make(chan struct{}), + recvTimeout: longTimeout, + } + + ch := listener.ChaincodeEvents() + + msg1 := &committer.ChaincodeEvent{Payload: []byte("msg1")} + msg2 := &committer.ChaincodeEvent{Payload: []byte("msg2")} + + // we publish and then consume + subscriber.Publish("testChaincode", msg1) + require.EventuallyWithT(t, func(ct *assert.CollectT) { + require.Len(ct, ch, 1) + require.Equal(ct, msg1, <-ch) + require.Len(ct, ch, 0) + }, timeout, tick) + + // next up, we fill our event buffer by publishing msg1 + for range customBufferLen { + subscriber.Publish("testChaincode", msg1) + } + require.EventuallyWithT(t, func(ct *assert.CollectT) { + // out channel should be full now + require.Len(ct, ch, customBufferLen) + }, timeout, tick) + + require.Never(t, func() bool { + // this should be blocking (until longTimeout is fired) + subscriber.Publish("testChaincode", msg1) + return false + }, timeout, tick) + + // we kick off our producer to publish msg2 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // as msg1 is not yet consumed, our producer is blocked + subscriber.Publish("testChaincode", msg2) + }() + + // let's give the producer a bit time + runtime.Gosched() + time.Sleep(waitFor) + + // let's make sure that our producer is still waiting to complete publish msg2 + require.Never(t, func() bool { + // we expect to be blocked + wg.Wait() + return false + }, timeout, tick) + + // now, we close the listener, which should unblock the producer + listener.CloseChaincodeEvents() + + // wait for the producer to finish + wg.Wait() + + // we expect msg1 to be successfully published + require.EventuallyWithT(t, func(c *assert.CollectT) { + require.Equal(c, msg1, <-ch) + }, timeout, tick) + + // check that our channel is closed + require.Eventually(t, func() bool { + return isClosed(ch) + }, timeout, tick) +} + +func isClosed(ch <-chan *committer.ChaincodeEvent) bool { + select { + case <-ch: + return true + default: + } + return false }