Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions sdks/python/hatchet_sdk/connection.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import os
from typing import Literal, cast, overload
from typing import Literal, cast, overload, Callable, TypeVar

import grpc

from hatchet_sdk.config import ClientConfig
from hatchet_sdk.exceptions import HatchetError
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class HatchetError does not exist in hatchet_sdk.exceptions. This import will fail at runtime. Based on the existing exception conventions in the codebase (see exceptions.py lines 6-173), all custom exceptions inherit directly from Exception or ValueError. The import should be removed and GRPCTransportError should inherit from Exception instead of HatchetError.

Copilot uses AI. Check for mistakes.


T = TypeVar("T")


@overload
Expand All @@ -26,6 +30,7 @@ def new_conn(config: ClientConfig, aio: bool) -> grpc.Channel | grpc.aio.Channel
root = f.read()

credentials = grpc.ssl_channel_credentials(root_certificates=root)

elif config.tls_config.strategy == "mtls":
assert config.tls_config.root_ca_file
assert config.tls_config.key_file
Expand Down Expand Up @@ -59,13 +64,9 @@ def new_conn(config: ClientConfig, aio: bool) -> grpc.Channel | grpc.aio.Channel
("grpc.default_compression_algorithm", grpc.Compression.Gzip),
]

# Set environment variable to disable fork support. Reference: https://github.com/grpc/grpc/issues/28557
# When steps execute via os.fork, we see `TSI_DATA_CORRUPTED` errors.
# needs to be the string "True" or "False"
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = str(config.grpc_enable_fork_support)

if config.grpc_enable_fork_support:
# See discussion: https://github.com/hatchet-dev/hatchet/pull/2057#discussion_r2243233357
os.environ["GRPC_POLL_STRATEGY"] = "poll"
Comment on lines 67 to 70
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valuable explanatory comments about fork support and TSI_DATA_CORRUPTED errors were removed. These comments provide important context for why GRPC_ENABLE_FORK_SUPPORT is set and reference GitHub issues that explain known problems. Consider restoring these comments:

Copilot uses AI. Check for mistakes.

if config.tls_config.strategy == "none":
Expand All @@ -88,3 +89,61 @@ def new_conn(config: ClientConfig, aio: bool) -> grpc.Channel | grpc.aio.Channel
grpc.Channel | grpc.aio.Channel,
conn,
)


# -------------------------------------------------------------------
# gRPC execution + transport error normalization (no behavior change)
# -------------------------------------------------------------------


def _execute_grpc_call(
fn: Callable[..., T],
*args,
**kwargs,
) -> T:
"""
Executes a gRPC stub call and normalizes transport-level errors.

Does NOT change retry behavior.
Only translates grpc.RpcError into clearer SDK-level exceptions.
"""
try:
return fn(*args, **kwargs)
except grpc.RpcError as e:
raise _translate_grpc_error(e) from e


def _translate_grpc_error(e: grpc.RpcError) -> Exception:
Comment on lines +106 to +110
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _execute_grpc_call function only catches grpc.RpcError, but throughout the codebase (e.g., admin.py lines 334, 402, 435), both grpc.RpcError and grpc.aio.AioRpcError are caught together. Since this helper is meant to be used for both sync and async gRPC calls, it should handle both exception types: except (grpc.RpcError, grpc.aio.AioRpcError) as e:

Suggested change
except grpc.RpcError as e:
raise _translate_grpc_error(e) from e
def _translate_grpc_error(e: grpc.RpcError) -> Exception:
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
raise _translate_grpc_error(e) from e
def _translate_grpc_error(
e: grpc.RpcError | grpc.aio.AioRpcError,
) -> Exception:

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation should accept both grpc.RpcError and grpc.aio.AioRpcError to be consistent with the codebase pattern (see admin.py lines 334, 402, 435). Suggest: def _translate_grpc_error(e: grpc.RpcError | grpc.aio.AioRpcError) -> Exception:

Suggested change
def _translate_grpc_error(e: grpc.RpcError) -> Exception:
def _translate_grpc_error(e: grpc.RpcError | grpc.aio.AioRpcError) -> Exception:

Copilot uses AI. Check for mistakes.
"""
Translate grpc.RpcError into more specific SDK exceptions.

No retry behavior changes.
"""
code = e.code()

# Transport-level / connectivity issues
if code == grpc.StatusCode.DEADLINE_EXCEEDED:
return GRPCTimeoutError(str(e))

if code == grpc.StatusCode.UNAVAILABLE:
return GRPCUnavailableError(str(e))

# Fallback: preserve original behavior
return e


# -------------------------------------------------------------------
# Transport exception types (minimal, incremental)
# -------------------------------------------------------------------


class GRPCTransportError(HatchetError):
"""Base class for gRPC transport-level failures."""
Comment on lines +134 to +135
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base class HatchetError does not exist. This class should inherit from Exception instead, following the pattern used by all other exceptions in the codebase (e.g., InvalidDependencyError, NonRetryableException, DedupeViolationError in exceptions.py).

Copilot uses AI. Check for mistakes.


class GRPCTimeoutError(GRPCTransportError):
"""Raised when a gRPC call exceeds its deadline."""


class GRPCUnavailableError(GRPCTransportError):
"""Raised when the gRPC server is unavailable."""
Comment on lines +134 to +143
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new exception classes (GRPCTransportError, GRPCTimeoutError, GRPCUnavailableError) are defined in connection.py, but all other SDK exception classes are defined in exceptions.py (e.g., InvalidDependencyError, DedupeViolationError, TaskRunError, etc.). For consistency with the existing codebase structure, these exception classes should be moved to exceptions.py where all other SDK exceptions are centralized.

Copilot uses AI. Check for mistakes.
Loading