Depend on the gRPC layer for connection state changes#2977
Depend on the gRPC layer for connection state changes#2977
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Pull request overview
This PR updates the Go SDK’s workflow run listener reconnection behavior to rely on gRPC’s native connectivity state transitions instead of using fixed sleep-based delays during retry/reconnect loops.
Changes:
- Add a
grpc.ClientConnreference to the workflow runs listener so it can observe connection state. - Introduce
waitForReadyLockedusinggrpc/connectivity(GetState,Connect,WaitForStateChange) as the retry delay mechanism. - Replace several
time.Sleepcalls in resubscribe/send/listen retry paths with connection-state waiting.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (w *WorkflowRunsListener) waitForReadyLocked(ctx context.Context) error { | ||
| for { | ||
| state := w.conn.GetState() | ||
| if state == connectivity.Ready { | ||
| return nil | ||
| } | ||
| if state == connectivity.Shutdown { | ||
| return errors.New("connection shutdown") | ||
| } |
There was a problem hiding this comment.
waitForReadyLocked dereferences w.conn without checking for nil. WorkflowRunsListener can be instantiated without conn (e.g., in unit tests or by external callers), which will cause a panic when this method is hit. Consider validating conn is non-nil when constructing the listener (or guarding here and returning a meaningful error / falling back to time-based wait).
| return errors.New("connection shutdown") | ||
| } | ||
| // Trigger a connection attempt if the channel is idle | ||
| w.conn.Connect() |
There was a problem hiding this comment.
The code calls w.conn.Connect() unconditionally, but the comment says it's only to trigger a connection attempt when the channel is idle. Consider checking state == connectivity.Idle before calling Connect (or update the comment) to avoid unnecessary calls and keep behavior/documentation consistent.
| w.conn.Connect() | |
| if state == connectivity.Idle { | |
| w.conn.Connect() | |
| } |
| if err != nil && ctx.Err() != nil { | ||
| return ctx.Err() |
There was a problem hiding this comment.
doRetrySubscribe ignores all waitForReadyLocked errors except when the parent ctx is canceled. If waitForReadyLocked returns a non-timeout error (e.g., "connection shutdown"), the loop will continue immediately and can spin/log aggressively. Consider handling non-deadline errors explicitly (e.g., return on Shutdown, or treat as a retryable error with backoff).
| if err != nil && ctx.Err() != nil { | |
| return ctx.Err() | |
| if err != nil { | |
| // If the parent context has been canceled, stop retrying. | |
| if ctx.Err() != nil { | |
| return ctx.Err() | |
| } | |
| // For non-timeout errors (e.g., connection shutdown), apply a backoff | |
| // to avoid a tight retry loop and excessive logging. | |
| if !errors.Is(err, context.DeadlineExceeded) { | |
| w.l.Warn().Err(err).Msg("waitForReady failed; backing off before retrying subscription") | |
| select { | |
| case <-ctx.Done(): | |
| return ctx.Err() | |
| case <-time.After(DefaultActionListenerRetryInterval): | |
| } | |
| } |
| waitCtx, cancel := context.WithTimeout(context.Background(), DefaultActionListenerRetryInterval) | ||
| _ = l.waitForReadyLocked(waitCtx) | ||
| cancel() |
There was a problem hiding this comment.
retrySend ignores the error from waitForReadyLocked. If the connection is in Shutdown, waitForReadyLocked returns immediately and this retry loop can become a tight loop (no backoff) while repeatedly attempting resubscribe/send. Consider handling the error (return it on Shutdown, or fall back to a timed wait) to preserve a bounded retry cadence.
| if status.Code(err) == codes.Unavailable { | ||
| l.l.Warn().Err(err).Msg("dispatcher is unavailable, retrying subscribe after 1 second") | ||
| time.Sleep(1 * time.Second) | ||
| l.l.Warn().Err(err).Msg("dispatcher is unavailable, waiting for connection to be ready") | ||
| waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval) | ||
| _ = l.waitForReadyLocked(waitCtx) | ||
| cancel() | ||
| } |
There was a problem hiding this comment.
In the Unavailable path, this now waits up to DefaultActionListenerRetryInterval (currently 5s) instead of the previous fixed 1s, and it also discards any waitForReadyLocked error. If the intent is to keep quick retry behavior on Unavailable, consider using a shorter timeout here (or a dedicated constant) and handle Shutdown/non-timeout errors to avoid spinning.
|
|
||
| time.Sleep(DefaultActionListenerRetryInterval) | ||
| waitCtx, cancel := context.WithTimeout(ctx, DefaultActionListenerRetryInterval) | ||
| _ = l.waitForReadyLocked(waitCtx) | ||
| cancel() |
There was a problem hiding this comment.
After a failed retrySubscribe, the code ignores waitForReadyLocked errors. Similar to other call sites, this can lead to very fast looping if the underlying connection is Shutdown (no backoff). Consider checking the returned error and aborting (or enforcing a timed delay) on non-timeout errors.
Description
Instead of sleeps during reconnection attempts in the Go SDK listener, we want to depend on the native gRPC connection state.
Type of change