Skip to content

Commit cc32d3f

Browse files
feat: add event watch task logics
--story=121104198
1 parent d39b8ad commit cc32d3f

File tree

10 files changed

+1017
-88
lines changed

10 files changed

+1017
-88
lines changed

src/storage/dal/mongo/local/mongo.go

+6
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,12 @@ func (c *Mongo) GetDBClient() *mongo.Client {
417417
func (c *Mongo) GetDBName() string {
418418
return c.cli.DBName()
419419
}
420+
421+
// GetMongoClient get mongo client
422+
func (c *Mongo) GetMongoClient() *MongoClient {
423+
return c.cli
424+
}
425+
420426
func (c *Mongo) redirectTable(tableName string) string {
421427
if common.IsObjectInstShardingTable(tableName) {
422428
tableName = common.BKTableNameBaseInst

src/storage/stream/event/list.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (e *Event) List(ctx context.Context, opts *types.ListOptions) (ch chan *typ
5858
eventChan := make(chan *types.Event, types.DefaultEventChanSize)
5959

6060
go func() {
61-
e.lister(ctx, false, listOpts, eventChan)
61+
e.lister(ctx, opts.WithRetry, listOpts, eventChan)
6262
}()
6363

6464
return eventChan, nil

src/storage/stream/event/utils.go

+2
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ func generateOptions(opts *types.Options) (mongo.Pipeline, *options.ChangeStream
7272
if *opts.MajorityCommitted {
7373
major := options.UpdateLookup
7474
streamOptions.FullDocument = &major
75+
preImage := options.WhenAvailable
76+
streamOptions.FullDocumentBeforeChange = &preImage
7577
} else {
7678
def := options.Default
7779
streamOptions.FullDocument = &def

src/storage/stream/loop/loop_watch.go

+82-74
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,17 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error {
4646
return err
4747
}
4848

49-
startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background())
49+
watchOpt, err := lw.updateStartTokenInfo(&opts.LoopOptions)
5050
if err != nil {
51-
blog.Errorf("%s job, run loop watch %s, but get start token failed, err: %v", opts.Name, lw.streamWatch.DBName,
52-
err)
5351
return err
5452
}
55-
56-
// update the start token.
57-
if len(startToken) != 0 {
58-
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken}
59-
}
53+
watchOpt.WatchFatalErrorCallback = opts.TokenHandler.ResetWatchToken
6054

6155
var cancel func()
6256
var cancelCtx context.Context
6357
cancelCtx, cancel = context.WithCancel(context.Background())
6458

65-
watcher, err := lw.streamWatch.Watch(cancelCtx, opts.WatchOpt)
59+
watcher, err := lw.streamWatch.Watch(cancelCtx, watchOpt)
6660
if err != nil {
6761
blog.Errorf("%s job, run loop, but watch failed, err: %v", opts.Name, err)
6862
cancel()
@@ -88,30 +82,43 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error {
8882
return nil
8983
}
9084

85+
func (lw *LoopsWatch) updateStartTokenInfo(opts *types.LoopOptions) (*types.WatchOptions, error) {
86+
startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background())
87+
if err != nil {
88+
blog.Errorf("%s job, loop watch db %s, but get start watch token failed, err: %v", opts.Name,
89+
lw.streamWatch.DBName, err)
90+
return nil, err
91+
}
92+
93+
// update the start token.
94+
if len(startToken.Token) != 0 {
95+
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken.Token}
96+
}
97+
if startToken.StartAtTime != nil {
98+
opts.WatchOpt.StartAtTime = startToken.StartAtTime
99+
}
100+
101+
return opts.WatchOpt, nil
102+
}
103+
91104
// WithBatch allows users to watch events with batch.
92105
func (lw *LoopsWatch) WithBatch(opts *types.LoopBatchOptions) error {
93106
if err := opts.Validate(); err != nil {
94107
blog.Errorf("run loop watch batch, but option is invalid, err: %v", err)
95108
return err
96109
}
97110

98-
startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background())
111+
watchOpt, err := lw.updateStartTokenInfo(&opts.LoopOptions)
99112
if err != nil {
100-
blog.Errorf("%s job, run loop watch batch %s, but get start token failed, err: %v", opts.Name,
101-
lw.streamWatch.DBName, err)
102113
return err
103114
}
104-
105-
// update the start token.
106-
if len(startToken) != 0 {
107-
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken}
108-
}
115+
watchOpt.WatchFatalErrorCallback = opts.TokenHandler.ResetWatchToken
109116

110117
var cancel func()
111118
var cancelCtx context.Context
112119
cancelCtx, cancel = context.WithCancel(context.Background())
113120

114-
watcher, err := lw.streamWatch.Watch(cancelCtx, opts.WatchOpt)
121+
watcher, err := lw.streamWatch.Watch(cancelCtx, watchOpt)
115122
if err != nil {
116123
blog.Errorf("%s job, run loop, but watch failed, err: %v", opts.Name, err)
117124
cancel()
@@ -167,23 +174,16 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc,
167174
cancel()
168175

169176
// use the last token to resume so that we can start again from where we stopped.
170-
lastToken, err := opts.TokenHandler.GetStartWatchToken(ctx)
177+
watchOpt, err := lw.updateStartTokenInfo(opts)
171178
if err != nil {
172-
blog.Errorf("%s job, run loop watch, but get last event token failed, err: %v", opts.Name, err)
173179
// notify retry signal, exit loop
174180
close(retrySignal)
175181
continue
176182
}
183+
opts.WatchOpt = watchOpt
177184

178-
blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %s.", opts.Name,
179-
lw.streamWatch.DBName, lastToken)
180-
181-
// set start after token if needed.
182-
if len(lastToken) != 0 {
183-
// we have already received the new event and handle it success,
184-
// so we need to use this token. otherwise, we should still use the w.watchOpt.StartAfterToken
185-
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: lastToken}
186-
}
185+
blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %+v.", opts.Name,
186+
lw.streamWatch.DBName, watchOpt.StartAfterToken)
187187

188188
var cancelCtx context.Context
189189
cancelCtx, cancel = context.WithCancel(ctx)
@@ -200,7 +200,8 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc,
200200
// start handle loop jobs
201201
go doHandler(cancelCtx, watcher, retrySignal)
202202

203-
blog.Warnf("%s job, retry loop %s from token: %s success.", opts.Name, lw.streamWatch.DBName, lastToken)
203+
blog.Warnf("%s job, retry loop %s from token: %+v success.", opts.Name, lw.streamWatch.DBName,
204+
watchOpt.StartAfterToken)
204205
}
205206
}
206207
}
@@ -220,17 +221,14 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
220221
}
221222

222223
for {
223-
224224
reWatch, loop := observer.canLoop()
225225
if reWatch {
226226
// stop the tick to release resource.
227227
ticker.Stop()
228-
blog.Warnf("%s job, master status has changed, try to re-watch again, collection:%s", opts.Name,
228+
blog.Warnf("%s job, master status has changed, try to re-watch again, db:%s", opts.Name,
229229
lw.streamWatch.DBName)
230-
231230
// trigger re-watch action now.
232231
close(retrySignal)
233-
234232
// exit the for loop
235233
return
236234
}
@@ -248,14 +246,12 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
248246
case <-ctxWithCancel.Done():
249247
// stop the tick to release resource.
250248
ticker.Stop()
251-
252249
blog.Warnf("%s job, received cancel loop watch %s signal, exit loop.", opts.Name, lw.streamWatch.DBName)
253250
// exist the goroutine
254251
return
255252

256253
case one := <-watcher.EventChan:
257254
batchEvents = append(batchEvents, one)
258-
259255
if blog.V(4) {
260256
blog.Infof("%s job, received %s event, detail: %s, op-time: %s, rid: %s", opts.Name,
261257
lw.streamWatch.DBName, one.String(), one.ClusterTime.String(), one.ID())
@@ -266,14 +262,12 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
266262
// continue to get more events
267263
continue
268264
}
269-
270265
case <-ticker.C:
271266
// handle with batch event.
272267
if len(batchEvents) == 0 {
273268
// ticks, but no events received, loop next round to get events.
274269
continue
275270
}
276-
277271
case <-opts.StopNotifier:
278272
ticker.Stop()
279273
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
@@ -284,50 +278,62 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
284278
break
285279
}
286280

287-
// for safety guarantee
288-
if len(batchEvents) == 0 {
289-
continue
281+
if lw.handleBatchEvents(ctxWithCancel, batchEvents, opts, retryObserver, retrySignal) {
282+
return
290283
}
284+
}
285+
}
291286

292-
first := batchEvents[0]
287+
// handleBatchEvents handle batch events, returns if the loop watch needs retry
288+
func (lw *LoopsWatch) handleBatchEvents(ctx context.Context, batchEvents []*types.Event, opts *types.LoopBatchOptions,
289+
retryObserver *retryHandler, retrySignal chan struct{}) bool {
293290

294-
blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName,
295-
len(batchEvents), first.ClusterTime.String(), first.ID())
291+
// for safety guarantee
292+
if len(batchEvents) == 0 {
293+
return false
294+
}
296295

297-
retry := opts.EventHandler.DoBatch(batchEvents)
298-
if retry {
296+
first := batchEvents[0]
299297

300-
if retryObserver.canStillRetry() {
301-
blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name,
302-
lw.streamWatch.DBName, len(batchEvents), first.ID())
303-
// an error occurred, we need to retry it later.
304-
// tell the schedule to re-watch again.
305-
close(retrySignal)
306-
// exist this goroutine.
307-
return
308-
}
298+
blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName,
299+
len(batchEvents), first.ClusterTime.String(), first.ID())
309300

310-
blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name,
311-
lw.streamWatch.DBName, first.ID())
312-
// save the event token now.
301+
retry := opts.EventHandler.DoBatch(batchEvents)
302+
if retry {
303+
if retryObserver.canStillRetry() {
304+
blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name,
305+
lw.streamWatch.DBName, len(batchEvents), first.ID())
306+
// an error occurred, we need to retry it later.
307+
// tell the schedule to re-watch again.
308+
close(retrySignal)
309+
// exit this goroutine.
310+
return true
313311
}
314312

315-
// reset retry counter so that the previous retry count will not affect the next event
316-
retryObserver.resetRetryCounter()
313+
blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name,
314+
lw.streamWatch.DBName, first.ID())
315+
// save the event token now.
316+
}
317317

318-
last := batchEvents[len(batchEvents)-1]
319-
// update the last watched token for resume usage.
320-
if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, last.Token.Data); err != nil {
321-
blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.",
322-
opts.Name, lw.streamWatch.DBName, err, first.ID())
318+
// reset retry counter so that the previous retry count will not affect the next event
319+
retryObserver.resetRetryCounter()
323320

324-
// retry later.
325-
close(retrySignal)
326-
// exist this goroutine
327-
return
328-
}
321+
last := batchEvents[len(batchEvents)-1]
322+
// update the last watched token for resume usage.
323+
lastToken := &types.TokenInfo{
324+
Token: last.Token.Data,
325+
StartAtTime: &last.ClusterTime,
329326
}
330-
327+
if err := opts.TokenHandler.SetLastWatchToken(ctx, lastToken); err != nil {
328+
blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.",
329+
opts.Name, lw.streamWatch.DBName, err, first.ID())
330+
331+
// retry later.
332+
close(retrySignal)
333+
// exit this goroutine
334+
return true
335+
}
336+
return false
331337
}
332338

333339
// tryLoopWithOne try handle event one by one
@@ -348,11 +354,9 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context,
348354
blog.Warnf("%s job, received cancel loop watch %s signal, exit loop, exit loop", opts.Name,
349355
lw.streamWatch.DBName)
350356
return
351-
352357
case <-opts.StopNotifier:
353358
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
354359
return
355-
356360
default:
357361
}
358362

@@ -398,7 +402,11 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context,
398402
retryObserver.resetRetryCounter()
399403

400404
// update the last watched token for resume usage.
401-
if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, one.Token.Data); err != nil {
405+
lastToken := &types.TokenInfo{
406+
Token: one.Token.Data,
407+
StartAtTime: &one.ClusterTime,
408+
}
409+
if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, lastToken); err != nil {
402410
blog.Errorf("%s job, loop watch %s event, but set last watched token failed, err: %v, rid: %s, "+
403411
"retry later.", lw.streamWatch.DBName, err, one.ID())
404412

0 commit comments

Comments
 (0)