@@ -25,7 +25,10 @@ import (
2525 "github.com/cockroachdb/cockroach/pkg/util/hlc"
2626 "github.com/cockroachdb/cockroach/pkg/util/log"
2727 "github.com/cockroachdb/cockroach/pkg/util/metric"
28+ "github.com/cockroachdb/cockroach/pkg/util/queue"
29+ "github.com/cockroachdb/cockroach/pkg/util/stop"
2830 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
31+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
2932 "github.com/cockroachdb/redact"
3033 "github.com/dustin/go-humanize"
3134)
@@ -320,35 +323,272 @@ func pprintTokens(t kvflowcontrol.Tokens) string {
320323 return humanize .IBytes (uint64 (t ))
321324}
322325
323- // SendTokenWatcherHandleID is a unique identifier for a handle that is
324- // watching for available elastic send tokens on a stream.
325- type SendTokenWatcherHandleID int64
326+ // SendTokenWatcher can be used for watching and waiting on available elastic
327+ // send tokens. The caller registers a notification, which will be called when
328+ // elastic tokens are available for the given stream being watched. Note that
329+ // only elastic tokens are watched, as the SendTokenWatcher is intended to be
330+ // used when a send queue exists for a replication stream, requiring only one
331+ // goroutine per stream.
332+ type SendTokenWatcher struct {
333+ stopper * stop.Stopper
334+ clock timeutil.TimeSource
335+ watchers syncutil.Map [kvflowcontrol.Stream , sendStreamTokenWatcher ]
336+ }
326337
327- // SendTokenWatcher is the interface for watching and waiting on available
328- // elastic send tokens. The watcher registers a notification, which will be
329- // called when elastic tokens are available for the stream this watcher is
330- // monitoring. Note only elastic tokens are watched as this is intended to be
331- // used when a send queue exists.
332- //
333- // TODO(kvoli): Consider de-interfacing if not necessary for testing.
334- type SendTokenWatcher interface {
335- // NotifyWhenAvailable queues up for elastic tokens for the given send token
336- // counter. When elastic tokens are available, the provided
337- // TokenGrantNotification is called. It is the caller's responsibility to
338- // call CancelHandle when tokens are no longer needed, or when the caller is
339- // done.
340- NotifyWhenAvailable (
341- * tokenCounter ,
342- TokenGrantNotification ,
343- ) SendTokenWatcherHandleID
344- // CancelHandle cancels the given handle, stopping it from being notified
345- // when tokens are available. CancelHandle should be called at most once.
346- CancelHandle (SendTokenWatcherHandleID )
338+ // NewSendTokenWatcher creates a new SendTokenWatcher.
339+ func NewSendTokenWatcher (stopper * stop.Stopper , clock timeutil.TimeSource ) * SendTokenWatcher {
340+ return & SendTokenWatcher {stopper : stopper , clock : clock }
347341}
348342
343+ const (
344+ // sendTokenWatcherWC is the class of tokens the send token watcher is
345+ // watching.
346+ sendTokenWatcherWC = admissionpb .ElasticWorkClass
347+ // watcherIdleCloseDuration is the duration after which the watcher will stop
348+ // if there are no registered notifications.
349+ watcherIdleCloseDuration = 1 * time .Minute
350+ )
351+
349352// TokenGrantNotification is an interface that is called when tokens are
350353// available.
351354type TokenGrantNotification interface {
352355 // Notify is called when tokens are available to be granted.
353356 Notify (context.Context )
354357}
358+
359+ // SendTokenWatcherHandle is a unique handle that is watching for available
360+ // elastic send tokens on a stream.
361+ type SendTokenWatcherHandle struct {
362+ // id is the unique identifier for this handle.
363+ id uint64
364+ // stream is the stream that this handle is watching.
365+ stream kvflowcontrol.Stream
366+ }
367+
368+ // NotifyWhenAvailable queues up for elastic tokens for the given send token
369+ // counter. When elastic tokens are available, the provided
370+ // TokenGrantNotification is called and the notification is deregistered. It is
371+ // the caller's responsibility to call CancelHandle if tokens are no longer
372+ // needed.
373+ //
374+ // Note the given context is used only for logging/tracing purposes and
375+ // cancellation is not respected.
376+ func (s * SendTokenWatcher ) NotifyWhenAvailable (
377+ ctx context.Context , tc * tokenCounter , notify TokenGrantNotification ,
378+ ) SendTokenWatcherHandle {
379+ return s .watcher (tc ).add (ctx , notify )
380+ }
381+
382+ // CancelHandle cancels the given handle, stopping it from being notified when
383+ // tokens are available.
384+ func (s * SendTokenWatcher ) CancelHandle (ctx context.Context , handle SendTokenWatcherHandle ) {
385+ if w , ok := s .watchers .Load (handle .stream ); ok {
386+ w .remove (ctx , handle )
387+ }
388+ }
389+
390+ // watcher returns the sendStreamTokenWatcher for the given stream. If the
391+ // watcher does not exist, it is created.
392+ func (s * SendTokenWatcher ) watcher (tc * tokenCounter ) * sendStreamTokenWatcher {
393+ if w , ok := s .watchers .Load (tc .Stream ()); ok {
394+ return w
395+ }
396+ w , _ := s .watchers .LoadOrStore (
397+ tc .Stream (), newSendStreamTokenWatcher (s .stopper , tc , s .clock .NewTimer ()))
398+ return w
399+ }
400+
401+ // sendStreamTokenWatcher is a watcher for available elastic send tokens on a
402+ // stream. It watches for available tokens and notifies the caller when tokens
403+ // are available.
404+ type sendStreamTokenWatcher struct {
405+ stopper * stop.Stopper
406+ tc * tokenCounter
407+ // nonEmptyCh is used to signal the watcher that there are events to process.
408+ // When the queue is empty, the watcher will wait for the next event to be
409+ // added before processing, by waiting on this channel.
410+ nonEmptyCh chan struct {}
411+ // timer is used to stop the watcher if there are no more handles for the
412+ // stream after some duration.
413+ timer timeutil.TimerI
414+ mu struct {
415+ syncutil.Mutex
416+ // idSeq is the unique identifier for the next handle.
417+ idSeq uint64
418+ // started is true if the watcher is running, false otherwise. It is used
419+ // to prevent running more than one goroutine per stream and to stop the
420+ // watcher if there are no more handles for the stream after some duration.
421+ started bool
422+ // queueItems maps handle ids to their grant notification.
423+ queueItems map [uint64 ]TokenGrantNotification
424+ // queue is the FIFO ordered queue of handle ids to be notified when tokens
425+ // are available.
426+ queue * queue.Queue [uint64 ]
427+ }
428+ }
429+
430+ func newSendStreamTokenWatcher (
431+ stopper * stop.Stopper , tc * tokenCounter , timer timeutil.TimerI ,
432+ ) * sendStreamTokenWatcher {
433+ w := & sendStreamTokenWatcher {
434+ stopper : stopper ,
435+ tc : tc ,
436+ timer : timer ,
437+ nonEmptyCh : make (chan struct {}, 1 ),
438+ }
439+ w .mu .started = false
440+ w .mu .queueItems = make (map [uint64 ]TokenGrantNotification )
441+ w .mu .queue , _ = queue .NewQueue [uint64 ]()
442+ return w
443+ }
444+
445+ // add adds a handle to be watched for available tokens. The handle is added to
446+ // the queue and the watcher is started if it is not already running.
447+ func (w * sendStreamTokenWatcher ) add (
448+ ctx context.Context , notify TokenGrantNotification ,
449+ ) SendTokenWatcherHandle {
450+ w .mu .Lock ()
451+ defer w .mu .Unlock ()
452+
453+ w .mu .idSeq ++
454+ wasEmpty := w .emptyLocked ()
455+ wasStopped := ! w .mu .started
456+ handle := SendTokenWatcherHandle {id : w .mu .idSeq , stream : w .tc .stream }
457+ w .mu .queueItems [handle .id ] = notify
458+ w .mu .queue .Enqueue (handle .id )
459+
460+ log .VEventf (ctx , 3 , "%v (id=%d) watching stream %v" , notify , handle .id , w .tc .stream )
461+
462+ if wasEmpty {
463+ // The queue was empty, so signal the watcher that there are events to
464+ // process.
465+ log .VEventf (ctx , 3 , "signaling %v non-empty" , w .tc .stream )
466+ select {
467+ case w .nonEmptyCh <- struct {}{}:
468+ default :
469+ }
470+ }
471+
472+ if wasStopped {
473+ // The watcher isn't running, so start it.
474+ log .VEventf (ctx , 2 , "starting %v send stream token watcher" , w .tc .stream )
475+ if err := w .stopper .RunAsyncTask (ctx ,
476+ "flow-control-send-stream-token-watcher" , w .run ); err == nil {
477+ w .mu .started = true
478+ } else {
479+ log .Warningf (ctx , "failed to start send stream token watcher: %v" , err )
480+ }
481+ }
482+
483+ return handle
484+ }
485+
486+ // remove removes the handle from being watched for available tokens.
487+ func (w * sendStreamTokenWatcher ) remove (ctx context.Context , handle SendTokenWatcherHandle ) {
488+ w .mu .Lock ()
489+ defer w .mu .Unlock ()
490+ // Don't bother removing it from the queue. When the handle is dequeued, the
491+ // handle won't be in the queueItems map and will be ignored.
492+ if notification , ok := w .mu .queueItems [handle .id ]; ok {
493+ log .VEventf (ctx , 3 , "%v (id=%d) stopped watching stream %v" ,
494+ notification , handle .id , w .tc .stream )
495+ delete (w .mu .queueItems , handle .id )
496+ }
497+ }
498+
499+ func (w * sendStreamTokenWatcher ) run (_ context.Context ) {
500+ ctx := context .Background ()
501+ for {
502+ select {
503+ // Drain the nonEmptyCh, we will check if the queue is empty under the
504+ // lock, which is also held to signal the nonEmptyCh. If the queue later
505+ // becomes empty, then non-empty, nonEmptyCh be signaled again.
506+ case <- w .nonEmptyCh :
507+ default :
508+ }
509+ if w .empty () {
510+ w .timer .Reset (watcherIdleCloseDuration )
511+ // The watcher will wait here until a item is added to the queue, or
512+ // until the stopper is quiescing.
513+ select {
514+ case <- w .stopper .ShouldQuiesce ():
515+ return
516+ case <- w .timer .Ch ():
517+ w .timer .MarkRead ()
518+ w .timer .Stop ()
519+ w .mu .Lock ()
520+ // The queue has been empty for watcherIdleCloseDuration, check if
521+ // this is still the case.
522+ //
523+ // Since the timer firing could have raced with an item being added to
524+ // the queue, so we need to check again if the queue is empty. Since
525+ // add() retains a lock for its lifetime, we can be sure that the queue
526+ // is empty if we hold the lock until the end of this method. If an
527+ // item is added a short time after, it will wait to acquire the lock,
528+ // notice the watcher is now stopped and start it again.
529+ if w .emptyLocked () {
530+ defer w .mu .Unlock ()
531+ w .mu .started = false
532+ return
533+ }
534+ // Otherwise, the queue is non-empty, so continue to token checking.
535+ w .mu .Unlock () // nolint:deferunlockcheck
536+ case <- w .nonEmptyCh :
537+ }
538+ }
539+
540+ available , handle := w .tc .TokensAvailable (sendTokenWatcherWC )
541+ // When there are no tokens available, wait for token counter channel to be
542+ // signaled, or until the stopper is quiescing.
543+ if ! available {
544+ waiting:
545+ for {
546+ select {
547+ case <- w .stopper .ShouldQuiesce ():
548+ return
549+ case <- handle .WaitChannel ():
550+ if handle .ConfirmHaveTokensAndUnblockNextWaiter () {
551+ break waiting
552+ }
553+ }
554+ }
555+ }
556+ // There were either tokens available (without waiting), or we waited, were
557+ // unblocked and confirmed that there are tokens available. Notify the next
558+ // handle in line.
559+ if grant , found := w .nextGrant (); found {
560+ log .VInfof (ctx , 4 ,
561+ "notifying %v of available tokens for stream %v" , grant , w .tc .stream )
562+ grant .Notify (ctx )
563+ }
564+ }
565+ }
566+
567+ // nextGrant returns the next grant in the queue and true if a grant is
568+ // available. If no grant is available, it returns false. If a grant is found
569+ // and dequeued, it will be removed from the queue.
570+ func (w * sendStreamTokenWatcher ) nextGrant () (grant TokenGrantNotification , found bool ) {
571+ w .mu .Lock ()
572+ defer w .mu .Unlock ()
573+ for id , ok := w .mu .queue .Dequeue (); ok ; id , ok = w .mu .queue .Dequeue () {
574+ // The front of the queue could be a handle that was removed, so we need to
575+ // check if it's still in the queueItems map.
576+ if grant , found = w .mu .queueItems [id ]; found {
577+ delete (w .mu .queueItems , id )
578+ return grant , found
579+ }
580+ }
581+ // Either the queue was empty or non-empty but every handle was removed.
582+ return nil , false
583+ }
584+
585+ // empty returns true iff there are no handles in the queue.
586+ func (w * sendStreamTokenWatcher ) empty () bool {
587+ w .mu .Lock ()
588+ defer w .mu .Unlock ()
589+ return w .emptyLocked ()
590+ }
591+
592+ func (w * sendStreamTokenWatcher ) emptyLocked () bool {
593+ return len (w .mu .queueItems ) == 0
594+ }
0 commit comments