Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions pkg/client/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"

dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
Expand Down Expand Up @@ -96,6 +97,8 @@ type WorkflowRunEventHandler func(event WorkflowRunEvent) error
type WorkflowRunsListener struct {
constructor func(context.Context) (dispatchercontracts.Dispatcher_SubscribeToWorkflowRunsClient, error)

conn *grpc.ClientConn

client dispatchercontracts.Dispatcher_SubscribeToWorkflowRunsClient
clientMu sync.Mutex
generation uint64
Expand Down Expand Up @@ -124,6 +127,7 @@ func (r *subscribeClientImpl) getWorkflowRunsListener(

w := &WorkflowRunsListener{
constructor: constructor,
conn: r.conn,
l: r.l,
}

Expand Down Expand Up @@ -165,6 +169,28 @@ func (w *WorkflowRunsListener) getClientSnapshot() (dispatchercontracts.Dispatch
return w.client, w.generation
}

// waitForReadyLocked blocks until the gRPC connection is ready or the context is canceled.
// This uses gRPC's native connectivity state API instead of sleep-based polling,
// which provides faster reconnection when the server becomes available.
// This method only accesses the immutable conn field, so it's safe to call while holding clientMu.
func (w *WorkflowRunsListener) waitForReadyLocked(ctx context.Context) error {
for {
state := w.conn.GetState()
if state == connectivity.Ready {
return nil
}
if state == connectivity.Shutdown {
return errors.New("connection shutdown")
}
Comment on lines +176 to +184
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForReadyLocked dereferences w.conn without checking for nil. WorkflowRunsListener can be instantiated without conn (e.g., in unit tests or by external callers), which will cause a panic when this method is hit. Consider validating conn is non-nil when constructing the listener (or guarding here and returning a meaningful error / falling back to time-based wait).

Copilot uses AI. Check for mistakes.
// Trigger a connection attempt if the channel is idle
w.conn.Connect()
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code calls w.conn.Connect() unconditionally, but the comment says it's only to trigger a connection attempt when the channel is idle. Consider checking state == connectivity.Idle before calling Connect (or update the comment) to avoid unnecessary calls and keep behavior/documentation consistent.

Suggested change
w.conn.Connect()
if state == connectivity.Idle {
w.conn.Connect()
}

Copilot uses AI. Check for mistakes.
// Block until state changes or context is canceled
if !w.conn.WaitForStateChange(ctx, state) {
return ctx.Err()
}
}
}

// retrySubscribe coalesces concurrent reconnection attempts via singleflight.
// Multiple goroutines calling this concurrently will share a single reconnection attempt.
func (w *WorkflowRunsListener) retrySubscribe(ctx context.Context) error {
Expand All @@ -182,7 +208,12 @@ func (w *WorkflowRunsListener) doRetrySubscribe(ctx context.Context) error {

for retries < DefaultActionListenerRetryCount {
if retries > 0 {
time.Sleep(DefaultActionListenerRetryInterval)
waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval)
err := w.waitForReadyLocked(waitCtx)
cancel()
if err != nil && ctx.Err() != nil {
return ctx.Err()
Comment on lines +214 to +215
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doRetrySubscribe ignores all waitForReadyLocked errors except when the parent ctx is canceled. If waitForReadyLocked returns a non-timeout error (e.g., "connection shutdown"), the loop will continue immediately and can spin/log aggressively. Consider handling non-deadline errors explicitly (e.g., return on Shutdown, or treat as a retryable error with backoff).

Suggested change
if err != nil && ctx.Err() != nil {
return ctx.Err()
if err != nil {
// If the parent context has been canceled, stop retrying.
if ctx.Err() != nil {
return ctx.Err()
}
// For non-timeout errors (e.g., connection shutdown), apply a backoff
// to avoid a tight retry loop and excessive logging.
if !errors.Is(err, context.DeadlineExceeded) {
w.l.Warn().Err(err).Msg("waitForReady failed; backing off before retrying subscription")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(DefaultActionListenerRetryInterval):
}
}

Copilot uses AI. Check for mistakes.
}
}

client, err := w.constructor(ctx)
Expand Down Expand Up @@ -305,7 +336,9 @@ func (l *WorkflowRunsListener) retrySend(workflowRunId string) error {
l.l.Error().Err(retryErr).Msg("failed to resubscribe after send failure")
}

time.Sleep(DefaultActionListenerRetryInterval)
waitCtx, cancel := context.WithTimeout(context.Background(), DefaultActionListenerRetryInterval)
_ = l.waitForReadyLocked(waitCtx)
cancel()
Comment on lines +339 to +341
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrySend ignores the error from waitForReadyLocked. If the connection is in Shutdown, waitForReadyLocked returns immediately and this retry loop can become a tight loop (no backoff) while repeatedly attempting resubscribe/send. Consider handling the error (return it on Shutdown, or fall back to a timed wait) to preserve a bounded retry cadence.

Copilot uses AI. Check for mistakes.
}

return fmt.Errorf("could not send to the worker after %d retries", DefaultActionListenerRetryCount)
Expand All @@ -329,8 +362,10 @@ func (l *WorkflowRunsListener) Listen(ctx context.Context) error {
consecutiveErrors++

if status.Code(err) == codes.Unavailable {
l.l.Warn().Err(err).Msg("dispatcher is unavailable, retrying subscribe after 1 second")
time.Sleep(1 * time.Second)
l.l.Warn().Err(err).Msg("dispatcher is unavailable, waiting for connection to be ready")
waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval)
_ = l.waitForReadyLocked(waitCtx)
cancel()
}
Comment on lines 364 to 369
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Unavailable path, this now waits up to DefaultActionListenerRetryInterval (currently 5s) instead of the previous fixed 1s, and it also discards any waitForReadyLocked error. If the intent is to keep quick retry behavior on Unavailable, consider using a shorter timeout here (or a dedicated constant) and handle Shutdown/non-timeout errors to avoid spinning.

Copilot uses AI. Check for mistakes.

retryErr := l.retrySubscribe(ctx)
Expand All @@ -342,7 +377,9 @@ func (l *WorkflowRunsListener) Listen(ctx context.Context) error {
return fmt.Errorf("failed to resubscribe after %d consecutive errors: %w", consecutiveErrors, retryErr)
}

time.Sleep(DefaultActionListenerRetryInterval)
waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval)
_ = l.waitForReadyLocked(waitCtx)
cancel()
Comment on lines 379 to +382
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a failed retrySubscribe, the code ignores waitForReadyLocked errors. Similar to other call sites, this can lead to very fast looping if the underlying connection is Shutdown (no backoff). Consider checking the returned error and aborting (or enforcing a timed delay) on non-timeout errors.

Copilot uses AI. Check for mistakes.
continue
}

Expand Down Expand Up @@ -426,6 +463,8 @@ type subscribeClientImpl struct {

clientv1 sharedcontracts.V1DispatcherClient

conn *grpc.ClientConn

l *zerolog.Logger

v validator.Validator
Expand All @@ -443,6 +482,7 @@ func newSubscribe(conn *grpc.ClientConn, opts *sharedClientOpts) SubscribeClient
return &subscribeClientImpl{
client: dispatchercontracts.NewDispatcherClient(conn),
clientv1: sharedcontracts.NewV1DispatcherClient(conn),
conn: conn,
l: opts.l,
v: opts.v,
ctx: opts.ctxLoader,
Expand Down
Loading