diff --git a/pkg/client/listener.go b/pkg/client/listener.go index fab85c83fb..395e2ab80a 100644 --- a/pkg/client/listener.go +++ b/pkg/client/listener.go @@ -14,6 +14,7 @@ import ( "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" @@ -96,6 +97,8 @@ type WorkflowRunEventHandler func(event WorkflowRunEvent) error type WorkflowRunsListener struct { constructor func(context.Context) (dispatchercontracts.Dispatcher_SubscribeToWorkflowRunsClient, error) + conn *grpc.ClientConn + client dispatchercontracts.Dispatcher_SubscribeToWorkflowRunsClient clientMu sync.Mutex generation uint64 @@ -124,6 +127,7 @@ func (r *subscribeClientImpl) getWorkflowRunsListener( w := &WorkflowRunsListener{ constructor: constructor, + conn: r.conn, l: r.l, } @@ -165,6 +169,28 @@ func (w *WorkflowRunsListener) getClientSnapshot() (dispatchercontracts.Dispatch return w.client, w.generation } +// waitForReadyLocked blocks until the gRPC connection is ready or the context is canceled. +// This uses gRPC's native connectivity state API instead of sleep-based polling, +// which provides faster reconnection when the server becomes available. +// This method only accesses the immutable conn field, so it's safe to call while holding clientMu. +func (w *WorkflowRunsListener) waitForReadyLocked(ctx context.Context) error { + for { + state := w.conn.GetState() + if state == connectivity.Ready { + return nil + } + if state == connectivity.Shutdown { + return errors.New("connection shutdown") + } + // Trigger a connection attempt if the channel is idle + w.conn.Connect() + // Block until state changes or context is canceled + if !w.conn.WaitForStateChange(ctx, state) { + return ctx.Err() + } + } +} + // retrySubscribe coalesces concurrent reconnection attempts via singleflight. // Multiple goroutines calling this concurrently will share a single reconnection attempt. func (w *WorkflowRunsListener) retrySubscribe(ctx context.Context) error { @@ -182,7 +208,12 @@ func (w *WorkflowRunsListener) doRetrySubscribe(ctx context.Context) error { for retries < DefaultActionListenerRetryCount { if retries > 0 { - time.Sleep(DefaultActionListenerRetryInterval) + waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval) + err := w.waitForReadyLocked(waitCtx) + cancel() + if err != nil && ctx.Err() != nil { + return ctx.Err() + } } client, err := w.constructor(ctx) @@ -305,7 +336,9 @@ func (l *WorkflowRunsListener) retrySend(workflowRunId string) error { l.l.Error().Err(retryErr).Msg("failed to resubscribe after send failure") } - time.Sleep(DefaultActionListenerRetryInterval) + waitCtx, cancel := context.WithTimeout(context.Background(), DefaultActionListenerRetryInterval) + _ = l.waitForReadyLocked(waitCtx) + cancel() } return fmt.Errorf("could not send to the worker after %d retries", DefaultActionListenerRetryCount) @@ -329,8 +362,10 @@ func (l *WorkflowRunsListener) Listen(ctx context.Context) error { consecutiveErrors++ if status.Code(err) == codes.Unavailable { - l.l.Warn().Err(err).Msg("dispatcher is unavailable, retrying subscribe after 1 second") - time.Sleep(1 * time.Second) + l.l.Warn().Err(err).Msg("dispatcher is unavailable, waiting for connection to be ready") + waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval) + _ = l.waitForReadyLocked(waitCtx) + cancel() } retryErr := l.retrySubscribe(ctx) @@ -342,7 +377,9 @@ func (l *WorkflowRunsListener) Listen(ctx context.Context) error { return fmt.Errorf("failed to resubscribe after %d consecutive errors: %w", consecutiveErrors, retryErr) } - time.Sleep(DefaultActionListenerRetryInterval) + waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval) + _ = l.waitForReadyLocked(waitCtx) + cancel() continue } @@ -426,6 +463,8 @@ type subscribeClientImpl struct { clientv1 sharedcontracts.V1DispatcherClient + conn *grpc.ClientConn + l *zerolog.Logger v validator.Validator @@ -443,6 +482,7 @@ func newSubscribe(conn *grpc.ClientConn, opts *sharedClientOpts) SubscribeClient return &subscribeClientImpl{ client: dispatchercontracts.NewDispatcherClient(conn), clientv1: sharedcontracts.NewV1DispatcherClient(conn), + conn: conn, l: opts.l, v: opts.v, ctx: opts.ctxLoader,