[Serve] Add HAProxy support for Ray Serve#60586
[Serve] Add HAProxy support for Ray Serve#60586eicherseiji wants to merge 24 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces HAProxy support to Ray Serve, which is a significant and valuable feature. The implementation is comprehensive, covering HAProxy process management, dynamic configuration generation, and extensive testing. The code is well-structured, with a clear separation of concerns. I have a few suggestions to improve robustness and efficiency, such as replacing an external socat dependency with native Python asyncio for socket communication and refining how environment variables are read. Overall, this is a solid contribution.
| async def _send_socket_command(self, command: str) -> str: | ||
| """Send a command to the HAProxy stats socket via subprocess.""" | ||
| try: | ||
| # Check if a socket file exists | ||
| if not os.path.exists(self.cfg.socket_path): | ||
| raise RuntimeError( | ||
| f"HAProxy socket file does not exist: {self.cfg.socket_path}." | ||
| ) | ||
|
|
||
| proc = await asyncio.create_subprocess_exec( | ||
| "socat", | ||
| "-", | ||
| f"UNIX-CONNECT:{self.cfg.socket_path}", | ||
| stdin=asyncio.subprocess.PIPE, | ||
| stdout=asyncio.subprocess.PIPE, | ||
| stderr=asyncio.subprocess.PIPE, | ||
| ) | ||
|
|
||
| try: | ||
| stdout, stderr = await asyncio.wait_for( | ||
| proc.communicate(f"{command}\n".encode("utf-8")), timeout=5.0 | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| proc.kill() | ||
| await proc.wait() | ||
| raise RuntimeError( | ||
| f"Timeout while sending command '{command}' to HAProxy socket" | ||
| ) | ||
|
|
||
| if proc.returncode != 0: | ||
| err = stderr.decode("utf-8", errors="ignore").strip() | ||
| raise RuntimeError( | ||
| f"Command '{command}' failed with code {proc.returncode}: {err}" | ||
| ) | ||
|
|
||
| result = stdout.decode("utf-8", errors="ignore") | ||
| logger.debug(f"Socket command '{command}' returned {len(result)} chars.") | ||
| return result | ||
| except Exception as e: | ||
| raise RuntimeError(f"Failed to send socket command '{command}': {e}") | ||
|
|
There was a problem hiding this comment.
The _send_socket_command method currently uses socat as a subprocess to communicate with the HAProxy admin socket. This introduces a dependency on an external command-line tool (socat) which might not be available in all environments, and is less efficient than using Python's native socket library.
As noted in the TODO on line 748, this can be improved by using Python's asyncio library to interact with the UNIX domain socket directly. This removes the external dependency and is more robust and performant.
Here's a suggested implementation:
async def _send_socket_command(self, command: str) -> str:
"""Send a command to the HAProxy stats socket."""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_unix_connection(self.cfg.socket_path),
timeout=5.0
)
try:
writer.write(f"{command}\n".encode("utf-8"))
await writer.drain()
response_bytes = await asyncio.wait_for(reader.read(), timeout=5.0)
result = response_bytes.decode("utf-8", errors="ignore")
logger.debug(f"Socket command '{command}' returned {len(result)} chars.")
return result
finally:
writer.close()
await writer.wait_closed()
except FileNotFoundError:
raise RuntimeError(
f"HAProxy socket file does not exist: {self.cfg.socket_path}."
)
except asyncio.TimeoutError:
raise RuntimeError(
f"Timeout while sending command '{command}' to HAProxy socket"
)
except Exception as e:
raise RuntimeError(f"Failed to send socket command '{command}': {e}")| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | ||
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | ||
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") | ||
| else None | ||
| ) |
There was a problem hiding this comment.
This code calls os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") twice, which is inefficient. You can use the walrus operator := to store the result of the first call and reuse it, making the code more efficient and readable.
| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | |
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | |
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") | |
| else None | |
| ) | |
| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | |
| int(val) | |
| if (val := os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | |
| else None | |
| ) |
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | ||
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | ||
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") | ||
| else None | ||
| ) |
There was a problem hiding this comment.
This code calls os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") twice, which is inefficient. You can use the walrus operator := to store the result of the first call and reuse it, making the code more efficient and readable.
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | |
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | |
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") | |
| else None | |
| ) | |
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | |
| int(val) | |
| if (val := os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | |
| else None | |
| ) |
| ) -> None: | ||
| start_time = time.time() | ||
|
|
||
| # TODO: update this to use health checks |
There was a problem hiding this comment.
As this TODO suggests, using an HTTP health check would be a more robust way to verify HAProxy's availability. The current implementation checks if the admin socket is responsive, but an HTTP check would verify that the frontend is up and correctly configured to serve traffic. You could use an HTTP client like httpx to make a request to http://127.0.0.1:{self.cfg.frontend_port}/-/healthz and check for a successful response.
This PR adds HAProxy support to OSS Ray Serve. HAProxy can be used as an alternative to the default Serve HTTP proxy, providing load balancing with features like graceful reloads and health checks. Key changes: - Add haproxy.py and haproxy_templates.py for HAProxy management - Add HAProxy-related constants to constants.py (RAY_SERVE_ENABLE_HAPROXY, etc.) - Add TARGET_GROUPS to LongPollNamespace for broadcasting target updates - Add get_proxy_actor_class() to default_impl.py for proxy selection - Modify controller.py to support HAProxy mode with target group broadcasting - Add comprehensive test coverage The feature is disabled by default and can be enabled by setting the RAY_SERVE_ENABLE_HAPROXY=1 environment variable. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…ests - Add DRAINING_MESSAGE constant to constants.py (used by haproxy.py) - Add BUILD.bazel targets for HAProxy integration tests - Add BUILD.bazel target for HAProxy controller unit test Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Change test size to 'large' - Add RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S env variable - Add 'haproxy' tag to test targets Signed-off-by: Seiji Eicher <seiji@anyscale.com>
These constants are needed by haproxy.py but were not defined in constants.py. The same constants are also defined in proxy_router.py for the default proxy. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The controller.py was missing: 1. Import of get_proxy_actor_class from default_impl 2. Passing proxy_actor_class=get_proxy_actor_class() to ProxyStateManager Without this, the default ProxyActor was always used instead of HAProxyManager even when HAProxy was enabled. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
f0e8a87 to
9d7a87e
Compare
- Add HAProxy build stage to docker/ray/Dockerfile with multi-stage build - Install HAProxy 2.8.12 from source with Prometheus exporter support - Add _dump_ingress_cache_for_testing method to HAProxyManager for test parity Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The apt-get installed HAProxy 2.0.x doesn't support directives needed by the tests (option idle-close-on-response and http-request return require HAProxy 2.2+). Build from source to get version 2.8.12. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Match rayturbo's pattern where test_haproxy.py and test_haproxy_api.py have pytestmark skipif, but test_metrics_haproxy.py relies on Bazel setting RAY_SERVE_ENABLE_HAPROXY=1 in the test target env. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
There was a problem hiding this comment.
Identical modulo import paths and env var prefix
There was a problem hiding this comment.
Identical mod. import ordering and env var prefixes
There was a problem hiding this comment.
Identical mod. import ordering and import paths
There was a problem hiding this comment.
Identical mod. import ordering
There was a problem hiding this comment.
Identical mod. import path change
There was a problem hiding this comment.
New get_proxy_actor_class is identical
There was a problem hiding this comment.
Identical, mod. unnecessary fixtures + Windows exclusion
- Use $BASE_IMAGE instead of ubuntu:22.04 for haproxy-builder - Add sudoers entry for ray user to manage HAProxy files - Remove || true from groupadd/useradd - Standardize comments and variable names (BUILD_DIR vs HAPROXY_BUILD_DIR) Signed-off-by: Seiji Eicher <seiji@anyscale.com>
- Rename env var and constant to use HA_PROXY (with underscore) - Move HAProxy constants to top of constants.py for cleaner diffs - Add detailed comments for each HAProxy configuration option Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
There was a problem hiding this comment.
Identical HAProxy build steps
There was a problem hiding this comment.
Identical HAProxy build steps
There was a problem hiding this comment.
why are we duplicating this here?
There was a problem hiding this comment.
if we are moving it into all (non-slim) ray images, you need to:
- move these code into
base-deps - remove the duplicated code in base-extra
this dockerfile is strictly preserved for installing ray wheel as the last step of image building, nothing else is allowed
There was a problem hiding this comment.
Thanks, moved to base-deps
There was a problem hiding this comment.
Identical mod. env var prefix
| self._refresh_autoscaling_deployments_cache() | ||
|
|
||
| self._last_broadcasted_target_groups: List[TargetGroup] = [] | ||
| # Initialize to None (not []) to ensure the first broadcast always happens, |
| if self._ha_proxy_enabled: | ||
| self.broadcast_target_groups_if_changed() | ||
|
|
||
| def broadcast_target_groups_if_changed(self) -> None: |
| that have running replicas, we return target groups for direct ingress. | ||
| If there are multiple applications with no running replicas, we return | ||
| one target group per application with unique route prefix. | ||
| 5. HAProxy is enabled and the caller is not an internal proxy manager. In |
| ] | ||
|
|
||
| if not apps: | ||
| # When HAProxy is enabled and there are no apps, return empty target groups |
| protocol=RequestProtocol.HTTP, | ||
| route_prefix=route_prefix, | ||
| targets=http_targets, | ||
| targets=[] if self._ha_proxy_enabled else http_targets, |
| protocol=RequestProtocol.GRPC, | ||
| route_prefix=route_prefix, | ||
| targets=grpc_targets, | ||
| targets=[] if self._ha_proxy_enabled else grpc_targets, |
Use FULL_BASE_IMAGE for the HAProxy builder stage instead of the undefined BASE_IMAGE variable. The build script only passes FULL_BASE_IMAGE, so using it for both stages is simpler and consistent. Signed-off-by: Seiji Eicher <seiji@anyscale.com> # Conflicts: # docker/ray/Dockerfile
Keep the original ARG BASE_IMAGE and FULL_BASE_IMAGE default to maintain backwards compatibility for builds that rely on constructing the full image name from a suffix (e.g., BASE_IMAGE=-cpu). Signed-off-by: Seiji Eicher <seiji@anyscale.com>
The haproxy-builder stage inherits USER ray from the base image, causing apt-get to fail with permission denied. Add USER root to run package installation as root. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| HAPROXY_VERSION="2.8.12" | ||
| HAPROXY_BUILD_DIR="$(mktemp -d)" | ||
| wget -O "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" "https://www.haproxy.org/download/2.8/src/haproxy-${HAPROXY_VERSION}.tar.gz" | ||
| tar -xzf "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" -C "${HAPROXY_BUILD_DIR}" --strip-components=1 | ||
| make -C "${HAPROXY_BUILD_DIR}" TARGET=linux-glibc USE_OPENSSL=1 USE_ZLIB=1 USE_PCRE=1 USE_LUA=1 USE_PROMEX=1 | ||
| sudo make -C "${HAPROXY_BUILD_DIR}" install | ||
| rm -rf "${HAPROXY_BUILD_DIR}" |
There was a problem hiding this comment.
maybe have a script for this rather than duplicating this everywhere?
also, can we get the binaries prebuilt rather than building from source?
There was a problem hiding this comment.
Makes sense. Okay with you if we dedupe/incorporate prebuilt binary in a follow-up PR?
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…e-haproxy-port Signed-off-by: Seiji Eicher <seiji@anyscale.com>
|
|
| ) | ||
|
|
||
| try: | ||
| await self._wait_for_hap_availability(proc) |
There was a problem hiding this comment.
Unused timeout_s parameter not forwarded to callee
Low Severity
_start_and_wait_for_haproxy accepts a timeout_s parameter but never passes it to _wait_for_hap_availability(proc) on line 554. The availability check always uses _wait_for_hap_availability's own default of 5 seconds, silently ignoring any custom timeout value a caller provides.
| # Disabled by default to prevent test suite interference | ||
| RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG = ( | ||
| os.environ.get("RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG", "1") == "1" | ||
| ) |
There was a problem hiding this comment.
Comment says "disabled by default" but default is enabled
Medium Severity
RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG defaults to "1" (enabled), but the comment on the preceding line states "Disabled by default to prevent test suite interference." The same contradictory comment appears in HAProxyConfig. If the comment reflects the intended behavior, the default value is wrong and may cause test flakiness from server state persistence across test runs.
Additional Locations (1)
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
… into serve-haproxy-port Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| ef.write("\r\n") | ||
| ef.write("Internal Server Error") | ||
|
|
||
| self.cfg.error_file_path = error_file_path |
There was a problem hiding this comment.
error_file_path not declared in HAProxyConfig dataclass
Low Severity
_initialize_directories_and_error_files dynamically sets self.cfg.error_file_path on the HAProxyConfig dataclass instance, but error_file_path is never declared as a field in the HAProxyConfig dataclass. The Jinja2 template at haproxy_templates.py references config.error_file_path. This works at runtime because Python allows setting arbitrary attributes on dataclass instances, but it bypasses the dataclass's type system and would not survive serialization round-trips.
Additional Locations (1)
| except Exception: | ||
| pass | ||
| if not ready_to_serve: | ||
| await asyncio.sleep(0.2) |
There was a problem hiding this comment.
serving() returns immediately when HAProxy stats are empty
Medium Severity
When wait_for_applications_running=True, serving() immediately returns if get_all_stats() returns an empty dict (no backends configured yet in HAProxy). Both all_backends and ready_backends are empty sets, so set() == set() evaluates to True. This can happen during startup before the first long-poll target-group update arrives and HAProxy reloads with backends, causing the proxy to be reported as serving before any application traffic can actually be routed.
Move ProxyActor import to lazy inside get_proxy_actor_class() to break the circular dependency: default_impl -> proxy -> default_impl. Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| for server in servers.values(): | ||
| if server.is_up: | ||
| ready_backends.add(backend) | ||
| ready_to_serve = all_backends == ready_backends |
There was a problem hiding this comment.
serving() returns immediately when stats are empty
Medium Severity
The serving() method exits immediately when get_all_stats() returns an empty dict (before target groups arrive), because set() == set() evaluates to True. This causes the method to signal readiness before any backends are configured, creating a race condition during startup. The method intends to wait for backends to have UP servers, but the empty-set equality is a vacuous truth that bypasses this wait entirely.
| if proc.returncode is None: | ||
| proc.kill() | ||
| await proc.wait() | ||
| self._proc = None |
There was a problem hiding this comment.
stop() leaves stale _proc reference when process already exited
Low Severity
In stop(), the self._proc = None assignment is only inside the if proc.returncode is None branch. If the HAProxy process has already exited (crashed), self._proc is never cleared. This leaves a stale process reference, meaning _is_running() and check_health() continue to see a dead process object instead of None, and a second call to stop() would re-enter the try block unnecessarily rather than taking the early-return path.


Why
Ray Serve's built-in Python proxy becomes a bottleneck at high request rates. HAProxy is a mature, C-based load balancer that can handle significantly more concurrent connections with lower latency.
What
Adds an opt-in HAProxy mode (
RAY_SERVE_ENABLE_HA_PROXY=1, disabled by default) that replaces the default Serve proxy with an HAProxy subprocess managed by a Ray actor.Architecture:
HAProxyManager(Ray actor,num_cpus=0) sits on the control path only — it receives routing updates via long poll (TARGET_GROUPSnamespace) and translates them into HAProxy config reloadsHAProxyApimanages the HAProxy subprocess lifecycle, including graceful reloads viaSO_REUSEPORTto minimize packet loss during config changesKey files:
haproxy.py(manager actor + subprocess wrapper),haproxy_templates.py(config generation), modifications tocontroller.py(target group broadcasting),long_poll.py(newTARGET_GROUPSnamespace),default_impl.py(proxy class selection)Docker: HAProxy 2.8.12 is built from source in a multi-stage Docker build (added to
serve.build.Dockerfileandray/Dockerfile)Test plan
All tests require
RAY_SERVE_ENABLE_HA_PROXY=1and the HAProxy binary:test_haproxy.py— core lifecycle, config reload, draining, multi-nodetest_haproxy_api.py— subprocess management, stats socket, graceful reloadtest_metrics_haproxy.py— HAProxy metrics export via Prometheus endpointtest_controller_haproxy.py— unit tests for controller integrationCI results (build #59273): 3/4 targets passed,
test_metrics_haproxywas flaky on first attempt (metrics timing issue intest_proxy_metrics_http_status_code_is_error), passed on retry.