Skip to content

Commit 9ada795

Browse files
Merge pull request #122 from hyperledger/fix_delete_listeners
fix: deleting listeners from event stream pagination
2 parents a1e7ab3 + c38e7b5 commit 9ada795

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

pkg/fftm/stream_management.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,16 @@ func (m *manager) restoreStreams() error {
7373
}
7474

7575
func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error {
76-
var lastInPage *fftypes.UUID
7776
for {
78-
listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID)
77+
// Do not specify after as we just delete everything
78+
listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, nil, startupPaginationLimit, txhandler.SortDirectionAscending, streamID)
7979
if err != nil {
8080
return err
8181
}
8282
if len(listenerDefs) == 0 {
8383
break
8484
}
8585
for _, def := range listenerDefs {
86-
lastInPage = def.ID
8786
if err := m.persistence.DeleteListener(ctx, def.ID); err != nil {
8887
return err
8988
}

pkg/fftm/stream_management_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,37 @@ func TestDeleteStartedListenerFail(t *testing.T) {
194194
mp.AssertExpectations(t)
195195
}
196196

197+
func TestDeleteStartedListenerWithPagination(t *testing.T) {
198+
199+
_, m, close := newTestManagerMockPersistence(t)
200+
defer close()
201+
202+
esID := apitypes.NewULID()
203+
lID := apitypes.NewULID()
204+
secondID := apitypes.NewULID()
205+
mp := m.persistence.(*persistencemocks.Persistence)
206+
mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return(
207+
[]*apitypes.Listener{
208+
{ID: lID, StreamID: esID},
209+
{ID: secondID, StreamID: esID},
210+
}, nil).Once()
211+
thirdID := apitypes.NewULID()
212+
mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return(
213+
[]*apitypes.Listener{
214+
{ID: thirdID, StreamID: esID},
215+
}, nil).Once()
216+
mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return(
217+
[]*apitypes.Listener{}, nil)
218+
mp.On("DeleteListener", m.ctx, lID).Return(nil)
219+
mp.On("DeleteListener", m.ctx, secondID).Return(nil)
220+
mp.On("DeleteListener", m.ctx, thirdID).Return(nil)
221+
222+
err := m.deleteAllStreamListeners(m.ctx, esID)
223+
assert.NoError(t, err)
224+
225+
mp.AssertExpectations(t)
226+
}
227+
197228
func TestDeleteStreamBadID(t *testing.T) {
198229

199230
_, m, close := newTestManagerMockPersistence(t)

0 commit comments

Comments
 (0)