Skip to content

Commit c982c2b

Browse files
authored
Merge pull request #102 from hyperledger/log-enhance
Enhance logging on event dispatch, and resolve edge case panic
2 parents dcc8484 + 792cccf commit c982c2b

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

internal/events/eventstream.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func (es *eventStream) processNewEvent(ctx context.Context, fev *ffcapi.Listener
636636
if notification.Confirmed {
637637
// Push it to the batch when confirmed
638638
// - Note this will block the confirmation manager when the event stream is blocked
639+
log.L(ctx).Debugf("Queuing confirmed event for batch assembly: '%s'", event)
639640
es.batchChannel <- fev
640641
}
641642
},
@@ -741,6 +742,8 @@ func (es *eventStream) batchLoop(startedState *startedStreamState) {
741742
},
742743
Event: *fev.Event,
743744
})
745+
} else {
746+
log.L(es.bgCtx).Warnf("Confirmed event not associated with any active listener: %s", fev.Event)
744747
}
745748
}
746749
case <-timeoutChannel:
@@ -757,7 +760,7 @@ func (es *eventStream) batchLoop(startedState *startedStreamState) {
757760
return
758761
}
759762

760-
if timedOut || len(batch.events) >= maxSize {
763+
if timedOut || (batch != nil && len(batch.events) >= maxSize) {
761764
var err error
762765
if batch != nil {
763766
batch.timeout.Stop()

internal/events/eventstream_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -1635,7 +1635,7 @@ func TestEventLoopIgnoreBadEvent(t *testing.T) {
16351635
es.processNewEvent(context.Background(), &ffcapi.ListenerEvent{})
16361636
}
16371637

1638-
func TestSkipEventsBehindCheckpoint(t *testing.T) {
1638+
func TestSkipEventsBehindCheckpointAndUnknownListener(t *testing.T) {
16391639

16401640
es := newTestEventStream(t, `{
16411641
"name": "ut_stream"
@@ -1680,6 +1680,10 @@ func TestSkipEventsBehindCheckpoint(t *testing.T) {
16801680
Checkpoint: &utCheckpointType{SomeSequenceNumber: 2000}, // on checkpoint - redelivery
16811681
Event: &ffcapi.Event{ID: ffcapi.EventID{ListenerID: listenerID, BlockNumber: 2000}},
16821682
}
1683+
es.batchChannel <- &ffcapi.ListenerEvent{
1684+
Checkpoint: &utCheckpointType{SomeSequenceNumber: 2001}, // this is for a listener that no longer exists on the ES
1685+
Event: &ffcapi.Event{ID: ffcapi.EventID{ListenerID: fftypes.NewUUID(), BlockNumber: 2001}},
1686+
}
16831687
es.batchChannel <- &ffcapi.ListenerEvent{
16841688
Checkpoint: &utCheckpointType{SomeSequenceNumber: 2001}, // this is a new event
16851689
Event: &ffcapi.Event{ID: ffcapi.EventID{ListenerID: listenerID, BlockNumber: 2001}},

0 commit comments

Comments
 (0)