Skip to content

Commit 07cbfe4

Browse files
committed
add subscription idle timeout
Signed-off-by: Jianjun Liao <jianjun.liao@outlook.com>
1 parent 5faa744 commit 07cbfe4

3 files changed

Lines changed: 145 additions & 12 deletions

File tree

br/pkg/streamhelper/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ go_test(
7272
embed = [":streamhelper"],
7373
flaky = True,
7474
race = "on",
75-
shard_count = 36,
75+
shard_count = 38,
7676
deps = [
7777
"//br/pkg/errors",
7878
"//br/pkg/logutil",

br/pkg/streamhelper/flush_subscriber.go

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"strconv"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/google/uuid"
@@ -27,6 +28,11 @@ import (
2728
const (
2829
// clearSubscriberTimeOut is the timeout for clearing the subscriber.
2930
clearSubscriberTimeOut = 1 * time.Minute
31+
// subscriptionIdleTimeout is the max duration a flush subscription can stay
32+
// open without receiving any event. The gRPC keepalive only proves the
33+
// transport is alive; this timeout makes sure the application-level
34+
// subscription is still making progress.
35+
subscriptionIdleTimeout = 10 * time.Minute
3036
)
3137

3238
// FlushSubscriber maintains the state of subscribing to the cluster.
@@ -40,6 +46,8 @@ type FlushSubscriber struct {
4046
eventsTunnel chan spans.Valued
4147
// The background context for subscribes.
4248
masterCtx context.Context
49+
// The max duration a subscription can stay open without receiving any event.
50+
subscriptionIdleTimeout time.Duration
4351
}
4452

4553
// SubscriberConfig is a config which cloud be applied into the subscriber.
@@ -51,15 +59,22 @@ func WithMasterContext(ctx context.Context) SubscriberConfig {
5159
return func(fs *FlushSubscriber) { fs.masterCtx = ctx }
5260
}
5361

62+
// WithSubscriptionIdleTimeout sets the max duration a subscription can stay open
63+
// without receiving any event. A non-positive timeout disables the idle check.
64+
func WithSubscriptionIdleTimeout(timeout time.Duration) SubscriberConfig {
65+
return func(fs *FlushSubscriber) { fs.subscriptionIdleTimeout = timeout }
66+
}
67+
5468
// NewSubscriber creates a new subscriber via the environment and optional configs.
5569
func NewSubscriber(dialer LogBackupService, cluster TiKVClusterMeta, config ...SubscriberConfig) *FlushSubscriber {
5670
subs := &FlushSubscriber{
5771
dialer: dialer,
5872
cluster: cluster,
5973

60-
subscriptions: map[uint64]*subscription{},
61-
eventsTunnel: make(chan spans.Valued, 1024),
62-
masterCtx: context.Background(),
74+
subscriptions: map[uint64]*subscription{},
75+
eventsTunnel: make(chan spans.Valued, 1024),
76+
masterCtx: context.Background(),
77+
subscriptionIdleTimeout: subscriptionIdleTimeout,
6378
}
6479

6580
for _, c := range config {
@@ -133,6 +148,11 @@ func (f *FlushSubscriber) HandleErrors() {
133148
log.Warn("Meet error.", zap.String("category", "log backup flush subscriber"),
134149
logutil.ShortError(err), zap.Uint64("store", id))
135150
if retry {
151+
if err := f.dialer.ClearCache(f.masterCtx, id); err != nil {
152+
log.Warn("failed to clear cached store connection before retrying subscription",
153+
zap.String("category", "log backup flush subscriber"),
154+
zap.Uint64("store", id), logutil.ShortError(err))
155+
}
136156
log.Info("retry connecting to store to add subscription",
137157
zap.String("category", "log backup flush subscriber"),
138158
zap.Uint64("store", id))
@@ -185,6 +205,7 @@ type subscription struct {
185205
// We record start bootstrap time and once a store restarts
186206
// we need to try reconnect even there is a error cannot be retry.
187207
storeBootAt uint64
208+
idleTimeout time.Duration
188209
output chan<- spans.Valued
189210

190211
onDaemonExit func()
@@ -211,10 +232,11 @@ func (s *subscription) clearError() {
211232
s.err = nil
212233
}
213234

214-
func newSubscription(toStore Store, output chan<- spans.Valued) *subscription {
235+
func newSubscription(toStore Store, output chan<- spans.Valued, idleTimeout time.Duration) *subscription {
215236
return &subscription{
216237
storeID: toStore.ID,
217238
storeBootAt: toStore.BootAt,
239+
idleTimeout: idleTimeout,
218240
output: output,
219241
}
220242
}
@@ -227,8 +249,9 @@ func (s *subscription) connect(ctx context.Context, dialer LogBackupService) {
227249
}
228250

229251
func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) error {
252+
clientID := uuid.NewString()
230253
log.Info("Adding subscription.", zap.String("category", "log backup subscription manager"),
231-
zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt))
254+
zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt), zap.String("client-id", clientID))
232255
// We should shutdown the background task firstly.
233256
// Once it yields some error during shuting down, the error won't be brought to next run.
234257
s.close(ctx)
@@ -240,17 +263,18 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
240263
}
241264
cx, cancel := context.WithCancel(ctx)
242265
cli, err := c.SubscribeFlushEvent(cx, &logbackup.SubscribeFlushEventRequest{
243-
ClientId: uuid.NewString(),
266+
ClientId: clientID,
244267
})
245268
if err != nil {
246269
cancel()
247270
_ = dialer.ClearCache(ctx, s.storeID)
248271
return errors.Annotate(err, "failed to subscribe events")
249272
}
250273
lcx := logutil.ContextWithField(cx, zap.Uint64("store-id", s.storeID),
251-
zap.String("category", "log backup flush subscriber"))
274+
zap.String("category", "log backup flush subscriber"),
275+
zap.String("client-id", clientID))
252276
s.cancel = cancel
253-
s.background = spawnJoinable(func() { s.listenOver(lcx, cli) })
277+
s.background = spawnJoinable(func() { s.listenOver(lcx, cli, cancel) })
254278
return nil
255279
}
256280

@@ -263,10 +287,55 @@ func (s *subscription) close(ctx context.Context) {
263287
// because it is a ever-sharing channel.
264288
}
265289

266-
func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
290+
func (s *subscription) startTimeoutWatcher(
291+
ctx context.Context,
292+
watcherDone, activityCh chan struct{},
293+
cancel context.CancelFunc,
294+
idleTimedOut *atomic.Bool,
295+
) {
296+
timer := time.NewTimer(s.idleTimeout)
297+
defer timer.Stop()
298+
for {
299+
select {
300+
case <-ctx.Done():
301+
return
302+
case <-watcherDone:
303+
return
304+
case <-activityCh:
305+
if !timer.Stop() {
306+
select {
307+
case <-timer.C:
308+
default:
309+
}
310+
}
311+
timer.Reset(s.idleTimeout)
312+
case <-timer.C:
313+
idleTimedOut.Store(true)
314+
logutil.CL(ctx).Warn("Listen idle timeout.",
315+
zap.Uint64("store", s.storeID), zap.Duration("idle-timeout", s.idleTimeout))
316+
cancel()
317+
return
318+
}
319+
}
320+
}
321+
322+
func (s *subscription) listenOver(ctx context.Context, cli eventStream, cancel context.CancelFunc) {
267323
storeID := s.storeID
268-
logutil.CL(ctx).Info("Listen starting.", zap.Uint64("store", storeID))
324+
logutil.CL(ctx).Info("Listen starting.", zap.Uint64("store", storeID), zap.Duration("idle-timeout", s.idleTimeout))
325+
activityCh := make(chan struct{}, 1)
326+
watcherDone := make(chan struct{})
327+
var wg sync.WaitGroup
328+
var idleTimedOut atomic.Bool
329+
if s.idleTimeout > 0 {
330+
wg.Add(1)
331+
go func() {
332+
defer wg.Done()
333+
s.startTimeoutWatcher(ctx, watcherDone, activityCh, cancel, &idleTimedOut)
334+
}()
335+
}
269336
defer func() {
337+
close(watcherDone)
338+
wg.Wait()
270339
if s.onDaemonExit != nil {
271340
s.onDaemonExit()
272341
}
@@ -287,6 +356,10 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
287356
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
288357
return
289358
}
359+
select {
360+
case activityCh <- struct{}{}:
361+
default:
362+
}
290363

291364
log.Debug("Sending events.", zap.Int("size", len(msg.Events)))
292365
for _, m := range msg.Events {
@@ -316,6 +389,10 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
316389
case <-ctx.Done():
317390
logutil.CL(ctx).Warn("Context canceled while sending events.",
318391
zap.Uint64("store", storeID))
392+
if idleTimedOut.Load() {
393+
s.emitError(errors.Annotatef(context.DeadlineExceeded,
394+
"flush subscription from store id %d has no activity for %s", s.storeID, s.idleTimeout))
395+
}
319396
return
320397
}
321398
}
@@ -325,7 +402,7 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
325402
}
326403

327404
func (f *FlushSubscriber) addSubscription(ctx context.Context, toStore Store) {
328-
f.subscriptions[toStore.ID] = newSubscription(toStore, f.eventsTunnel)
405+
f.subscriptions[toStore.ID] = newSubscription(toStore, f.eventsTunnel, f.subscriptionIdleTimeout)
329406
}
330407

331408
func (f *FlushSubscriber) removeSubscription(ctx context.Context, toStore uint64) {

br/pkg/streamhelper/subscription_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package streamhelper_test
55
import (
66
"context"
77
"fmt"
8+
"strings"
89
"sync"
910
"testing"
1011
"time"
@@ -278,3 +279,58 @@ func TestEncounterError(t *testing.T) {
278279
sub.HandleErrors()
279280
require.NoError(t, sub.PendingErrors())
280281
}
282+
283+
func TestSubscriptionIdleTimeoutClearsCacheBeforeRetry(t *testing.T) {
284+
req := require.New(t)
285+
ctx := context.Background()
286+
c := createFakeCluster(t, 1, true)
287+
installSubscribeSupport(c)
288+
289+
clearedCache := make(chan uint64, 1)
290+
c.SetOnClearCache(func(storeID uint64) error {
291+
select {
292+
case clearedCache <- storeID:
293+
default:
294+
}
295+
return nil
296+
})
297+
298+
sub := streamhelper.NewSubscriber(c, c, streamhelper.WithSubscriptionIdleTimeout(200*time.Millisecond))
299+
defer sub.Drop()
300+
req.NoError(sub.UpdateStoreTopology(ctx))
301+
req.Eventually(func() bool {
302+
err := sub.PendingErrors()
303+
return err != nil && strings.Contains(err.Error(), "while receiving from")
304+
}, 3*time.Second, 10*time.Millisecond)
305+
306+
sub.HandleErrors()
307+
req.NoError(sub.PendingErrors())
308+
req.Eventually(func() bool {
309+
return len(clearedCache) > 0
310+
}, 3*time.Second, 10*time.Millisecond)
311+
312+
cp := c.advanceCheckpoints()
313+
c.flushAll()
314+
s := collectCheckpointSpans(t, sub, cp)
315+
req.Equal(cp, s.MinValue())
316+
}
317+
318+
func TestSubscriptionIdleTimeoutWhileSendingEvents(t *testing.T) {
319+
req := require.New(t)
320+
ctx := context.Background()
321+
c := createFakeCluster(t, 4, true)
322+
c.splitAndScatter(manyRegions(0, 1500)...)
323+
installSubscribeSupport(c)
324+
325+
sub := streamhelper.NewSubscriber(c, c, streamhelper.WithSubscriptionIdleTimeout(200*time.Millisecond))
326+
defer sub.Drop()
327+
req.NoError(sub.UpdateStoreTopology(ctx))
328+
329+
c.advanceCheckpoints()
330+
c.flushAll()
331+
332+
req.Eventually(func() bool {
333+
err := sub.PendingErrors()
334+
return err != nil && strings.Contains(err.Error(), "has no activity")
335+
}, 3*time.Second, 10*time.Millisecond)
336+
}

0 commit comments

Comments
 (0)