Skip to content

Commit c6d6aac

Browse files
authored
fix(fxconfig): cleaning up stale subscriber on canceled/timeout Subscribe paths (hyperledger#169)
## Problem `NotificationClient.Subscribe()` appended a subscriber before validating context state. If the context was canceled before queueing the upstream request, the method returned with an error but left stale subscriber state in memory for the same txID. ## Root cause In `Subscribe()` the map mutation happened too early: 1. Create receiver channel 2. Append subscriber to `subscribers[txID]` 3. Later fail on canceled/timed-out context before successful queue send 4. Stale subscriber remained ## Implemented fix Updated `Subscribe()` to keep single-upstream-subscription behavior and clean up stale state on failed enqueue: - Keep subscriber registration under lock - If this is not the first subscriber for a txID, return immediately (reuse existing upstream subscription) - For first subscriber path, if context is canceled before enqueue, delete `subscribers[txID]` - If context is canceled while waiting to enqueue, delete `subscribers[txID]` This guarantees the map is rolled back when request enqueue does not happen. --------- Signed-off-by: rootp1 <arnav.iitr@gmail.com>
1 parent 321cab8 commit c6d6aac

2 files changed

Lines changed: 119 additions & 5 deletions

File tree

tools/fxconfig/internal/client/notifications.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,50 @@ func (n *NotificationClient) Subscribe(ctx context.Context, txID string) (chan i
9191
}
9292

9393
receiverCh := make(chan int, 1)
94+
isFirst := func() bool {
95+
n.subscribersMu.Lock()
96+
defer n.subscribersMu.Unlock()
9497

95-
n.subscribersMu.Lock()
96-
defer n.subscribersMu.Unlock()
98+
subscribers := n.subscribers[txID]
99+
n.subscribers[txID] = append(subscribers, receiverCh)
97100

98-
subscribers := n.subscribers[txID]
99-
n.subscribers[txID] = append(subscribers, receiverCh)
101+
return len(subscribers) == 0
102+
}()
100103

101-
if len(subscribers) > 0 {
104+
if !isFirst {
102105
// we already have an active subscription for this txID
103106
return receiverCh, nil
104107
}
105108

109+
rollback := func() {
110+
// rollback can race logically with dispatcher cleanup in listen(),
111+
// where completed txIDs are also deleted from n.subscribers. The shared
112+
// subscribersMu lock plus the missing-key guard below makes this idempotent
113+
// and safe regardless of which path removes the entry first.
114+
n.subscribersMu.Lock()
115+
defer n.subscribersMu.Unlock()
116+
117+
subscribers, ok := n.subscribers[txID]
118+
if !ok {
119+
return
120+
}
121+
122+
for i, ch := range subscribers {
123+
if ch != receiverCh {
124+
continue
125+
}
126+
127+
subscribers = append(subscribers[:i], subscribers[i+1:]...)
128+
if len(subscribers) == 0 {
129+
delete(n.subscribers, txID)
130+
return
131+
}
132+
133+
n.subscribers[txID] = subscribers
134+
return
135+
}
136+
}
137+
106138
// setup request
107139
req := &committerpb.NotificationRequest{
108140
TxStatusRequest: &committerpb.TxIDsBatch{
@@ -114,13 +146,15 @@ func (n *NotificationClient) Subscribe(ctx context.Context, txID string) (chan i
114146
// check if our ctx is still open
115147
select {
116148
case <-ctx.Done():
149+
rollback()
117150
return nil, ctx.Err()
118151
default:
119152
}
120153

121154
// try to push to request queue
122155
select {
123156
case <-ctx.Done():
157+
rollback()
124158
return nil, ctx.Err()
125159
case n.requestQueue <- req:
126160
}

tools/fxconfig/internal/client/notifications_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,86 @@ func TestNotificationClient_Subscribe_ContextCanceled(t *testing.T) {
155155
require.ErrorIs(t, err, context.Canceled)
156156
}
157157

158+
func TestNotificationClient_Subscribe_CancelKeepsExistingSubscribers(t *testing.T) {
159+
t.Parallel()
160+
161+
nc := newTestNotificationClient(time.Second)
162+
txID := "tx1"
163+
164+
firstCtx, cancelFirst := context.WithCancel(context.Background())
165+
firstErrCh := make(chan error, 1)
166+
go func() {
167+
_, err := nc.Subscribe(firstCtx, txID)
168+
firstErrCh <- err
169+
}()
170+
171+
require.Eventually(t, func() bool {
172+
nc.subscribersMu.RLock()
173+
defer nc.subscribersMu.RUnlock()
174+
return len(nc.subscribers[txID]) == 1
175+
}, time.Second, time.Millisecond)
176+
177+
secondCh, err := nc.Subscribe(t.Context(), txID)
178+
require.NoError(t, err)
179+
require.NotNil(t, secondCh)
180+
181+
cancelFirst()
182+
require.ErrorIs(t, <-firstErrCh, context.Canceled)
183+
184+
nc.subscribersMu.RLock()
185+
subscribers := append([]chan int(nil), nc.subscribers[txID]...)
186+
nc.subscribersMu.RUnlock()
187+
require.Len(t, subscribers, 1)
188+
require.Equal(t, secondCh, subscribers[0])
189+
}
190+
191+
func TestNotificationClient_Subscribe_NoStaleSubscriberOnContextCancel(t *testing.T) {
192+
t.Parallel()
193+
194+
nc := newTestNotificationClient(time.Second)
195+
txID := "tx1"
196+
197+
// First call with canceled context
198+
ctx1, cancel1 := context.WithCancel(context.Background())
199+
cancel1()
200+
_, err1 := nc.Subscribe(ctx1, txID)
201+
require.ErrorIs(t, err1, context.Canceled)
202+
203+
// Verify no subscriber was added
204+
nc.subscribersMu.RLock()
205+
subscribersAfterFirstCall := len(nc.subscribers[txID])
206+
nc.subscribersMu.RUnlock()
207+
require.Equal(t, 0, subscribersAfterFirstCall, "No subscriber should be added when context is canceled")
208+
209+
// Second call with active context - should send a request
210+
go func() { <-nc.requestQueue }()
211+
ch2, err2 := nc.Subscribe(t.Context(), txID)
212+
require.NoError(t, err2)
213+
require.NotNil(t, ch2)
214+
215+
// Verify exactly one subscriber is added
216+
nc.subscribersMu.RLock()
217+
subscribersAfterSecondCall := len(nc.subscribers[txID])
218+
nc.subscribersMu.RUnlock()
219+
require.Equal(t, 1, subscribersAfterSecondCall, "Exactly one subscriber should be added on successful subscribe")
220+
}
221+
222+
func TestNotificationClient_Subscribe_NoStaleSubscriberOnTimeout(t *testing.T) {
223+
t.Parallel()
224+
225+
nc := newTestNotificationClient(time.Millisecond)
226+
txID := "tx-timeout"
227+
228+
// No consumer on requestQueue, so send should timeout.
229+
_, err := nc.Subscribe(t.Context(), txID)
230+
require.Error(t, err)
231+
232+
nc.subscribersMu.RLock()
233+
subscribersAfterTimeout := len(nc.subscribers[txID])
234+
nc.subscribersMu.RUnlock()
235+
require.Equal(t, 0, subscribersAfterTimeout, "No subscriber should remain after subscribe timeout")
236+
}
237+
158238
func TestNotificationClient_Subscribe_SendsRequest(t *testing.T) {
159239
t.Parallel()
160240

0 commit comments

Comments
 (0)