Skip to content

Commit 87a1a8b

Browse files
authored
logpuller: fix stale sent request after fast region error (#4557)
close #4472
1 parent 4c62245 commit 87a1a8b

File tree

4 files changed

+141
-2
lines changed

4 files changed

+141
-2
lines changed

logservice/logpuller/region_req_cache.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
191191
}
192192

193193
delete(regionReqs, regionID)
194+
if len(regionReqs) == 0 {
195+
delete(c.sentRequests.regionReqs, subID)
196+
}
194197
c.markDone()
195198
}
196199

logservice/logpuller/region_req_cache_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,3 +311,26 @@ func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) {
311311
require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID()))
312312
require.Equal(t, 0, cache.getPendingCount())
313313
}
314+
315+
func TestRequestCacheMarkStopped_ReleasesSlot(t *testing.T) {
316+
cache := newRequestCache(10)
317+
ctx := context.Background()
318+
319+
region := createTestRegionInfo(1, 1)
320+
321+
ok, err := cache.add(ctx, region, false)
322+
require.True(t, ok)
323+
require.NoError(t, err)
324+
require.Equal(t, 1, cache.getPendingCount())
325+
326+
req, err := cache.pop(ctx)
327+
require.NoError(t, err)
328+
329+
cache.markSent(req)
330+
require.Equal(t, 1, cache.getPendingCount())
331+
require.Contains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID)
332+
333+
cache.markStopped(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID())
334+
require.Equal(t, 0, cache.getPendingCount())
335+
require.NotContains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID)
336+
}

logservice/logpuller/region_request_worker.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,11 +406,25 @@ func (s *regionRequestWorker) processRegionSendTask(
406406
state := newRegionFeedState(region, uint64(subID), s)
407407
state.start()
408408
s.addRegionState(subID, region.verID.GetID(), state)
409+
// Mark the request as sent before sending it.
410+
// Otherwise there is a race with the receiver goroutine:
411+
// 1. addRegionState makes the region visible to error handling.
412+
// 2. doSend sends the request.
413+
// 3. the receiver goroutine may receive a region error immediately.
414+
// 4. markStopped runs before markSent, so requestCache.markStopped cannot
415+
// find the request in sentRequests.
416+
// 5. the sender goroutine then calls markSent and leaves a stale sent
417+
// request behind, even though the region has already been
418+
// unlocked/rescheduled.
419+
//
420+
// Tracking the request before Send keeps requestedRegions and
421+
// sentRequests visible in the same order and avoids leaving stale
422+
// requests in cleanup.
423+
s.requestCache.markSent(regionReq)
409424
if err := doSend(s.createRegionRequest(region)); err != nil {
410-
s.requestCache.markDone()
425+
state.markStopped(err)
411426
return err
412427
}
413-
s.requestCache.markSent(regionReq)
414428
}
415429
regionReq, err = fetchMoreReq()
416430
if err != nil {

logservice/logpuller/region_request_worker_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,40 @@ import (
1717
"context"
1818
"testing"
1919

20+
"github.com/pingcap/errors"
21+
"github.com/pingcap/kvproto/pkg/cdcpb"
22+
"github.com/pingcap/kvproto/pkg/metapb"
23+
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
2024
"github.com/stretchr/testify/require"
25+
"github.com/tikv/client-go/v2/tikv"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/metadata"
2128
)
2229

30+
type mockEventFeedV2Client struct {
31+
sendErr error
32+
}
33+
34+
func (m *mockEventFeedV2Client) Send(*cdcpb.ChangeDataRequest) error { return m.sendErr }
35+
func (m *mockEventFeedV2Client) Recv() (*cdcpb.ChangeDataEvent, error) { return nil, nil }
36+
func (m *mockEventFeedV2Client) Header() (metadata.MD, error) { return metadata.MD{}, nil }
37+
func (m *mockEventFeedV2Client) Trailer() metadata.MD { return metadata.MD{} }
38+
func (m *mockEventFeedV2Client) CloseSend() error { return nil }
39+
func (m *mockEventFeedV2Client) Context() context.Context { return context.Background() }
40+
func (m *mockEventFeedV2Client) SendMsg(any) error { return nil }
41+
func (m *mockEventFeedV2Client) RecvMsg(any) error { return nil }
42+
43+
func prepareRegionForSendTest(region regionInfo) regionInfo {
44+
region.rpcCtx = &tikv.RPCContext{
45+
Meta: &metapb.Region{
46+
RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1},
47+
},
48+
}
49+
region.lockedRangeState = &regionlock.LockedRangeState{}
50+
region.lockedRangeState.ResolvedTs.Store(100)
51+
return region
52+
}
53+
2354
func TestRegionStatesOperation(t *testing.T) {
2455
worker := &regionRequestWorker{}
2556
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
@@ -64,3 +95,71 @@ func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) {
6495
require.Nil(t, worker.preFetchForConnecting)
6596
require.Equal(t, 0, worker.requestCache.getPendingCount())
6697
}
98+
99+
func TestClearPendingRegionsDoesNotReturnStoppedSentRegion(t *testing.T) {
100+
worker := &regionRequestWorker{
101+
requestCache: newRequestCache(10),
102+
}
103+
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
104+
105+
ctx := context.Background()
106+
region := createTestRegionInfo(1, 1)
107+
108+
ok, err := worker.requestCache.add(ctx, region, false)
109+
require.NoError(t, err)
110+
require.True(t, ok)
111+
112+
req, err := worker.requestCache.pop(ctx)
113+
require.NoError(t, err)
114+
115+
state := newRegionFeedState(req.regionInfo, uint64(req.regionInfo.subscribedSpan.subID), worker)
116+
state.start()
117+
worker.addRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID(), state)
118+
119+
// Simulate the race we are fixing in processRegionSendTask:
120+
// once a request is visible in sentRequests, a fast region error may mark the
121+
// region stopped before worker cleanup runs. In that case, markStopped should
122+
// remove the sent request immediately, so clearPendingRegions must not return
123+
// the stale region again during worker shutdown.
124+
worker.requestCache.markSent(req)
125+
state.markStopped(errors.New("send request to store error"))
126+
worker.takeRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID())
127+
128+
require.Equal(t, 0, worker.requestCache.getPendingCount())
129+
require.Empty(t, worker.clearPendingRegions())
130+
}
131+
132+
func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) {
133+
worker := &regionRequestWorker{
134+
requestCache: newRequestCache(10),
135+
store: &requestedStore{storeAddr: "store-1"},
136+
client: &subscriptionClient{},
137+
}
138+
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
139+
140+
ctx := context.Background()
141+
region := prepareRegionForSendTest(createTestRegionInfo(1, 1))
142+
143+
ok, err := worker.requestCache.add(ctx, region, false)
144+
require.NoError(t, err)
145+
require.True(t, ok)
146+
require.Equal(t, 1, worker.requestCache.getPendingCount())
147+
148+
req, err := worker.requestCache.pop(ctx)
149+
require.NoError(t, err)
150+
worker.preFetchForConnecting = new(regionInfo)
151+
*worker.preFetchForConnecting = req.regionInfo
152+
153+
sendErr := errors.New("send failed")
154+
conn := &ConnAndClient{
155+
Client: &mockEventFeedV2Client{sendErr: sendErr},
156+
Conn: &grpc.ClientConn{},
157+
}
158+
159+
err = worker.processRegionSendTask(ctx, conn)
160+
require.ErrorIs(t, err, sendErr)
161+
require.Equal(t, 0, worker.requestCache.getPendingCount())
162+
require.Empty(t, worker.requestCache.sentRequests.regionReqs)
163+
state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID())
164+
require.True(t, state == nil || state.isStale(), "region state should be removed or marked stale after send failure")
165+
}

0 commit comments

Comments
 (0)