-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Serve] Add HAProxy support for Ray Serve #60586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 12 commits
e533fa9
60f7ab1
764dd04
856d1eb
9d7a87e
20b6005
338fa19
35de30c
b9c72fa
76f508f
2ec2991
4918537
a8c5c15
4bbf637
a3b8e8c
a4982b1
99a101c
efb886a
abbb2ec
3e61b05
31f90f2
8cc4bce
58a9d3c
d344c87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,46 @@ SHELL ["/bin/bash", "-ice"] | |
|
|
||
| COPY . . | ||
|
|
||
| # Install HAProxy from source | ||
| RUN <<EOF | ||
| #!/bin/bash | ||
| set -euo pipefail | ||
|
|
||
| # Install HAProxy dependencies | ||
| sudo apt-get update && sudo apt-get install -y \ | ||
| build-essential \ | ||
| curl \ | ||
| libc6-dev \ | ||
| liblua5.3-dev \ | ||
| libpcre3-dev \ | ||
| libssl-dev \ | ||
| socat \ | ||
| wget \ | ||
| zlib1g-dev \ | ||
| && sudo rm -rf /var/lib/apt/lists/* | ||
|
|
||
| # Create haproxy user and group | ||
| sudo groupadd -r haproxy | ||
| sudo useradd -r -g haproxy haproxy | ||
|
|
||
| # Download and compile HAProxy from official source | ||
| HAPROXY_VERSION="2.8.12" | ||
| HAPROXY_BUILD_DIR="$(mktemp -d)" | ||
| wget -O "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" "https://www.haproxy.org/download/2.8/src/haproxy-${HAPROXY_VERSION}.tar.gz" | ||
| tar -xzf "${HAPROXY_BUILD_DIR}/haproxy.tar.gz" -C "${HAPROXY_BUILD_DIR}" --strip-components=1 | ||
| make -C "${HAPROXY_BUILD_DIR}" TARGET=linux-glibc USE_OPENSSL=1 USE_ZLIB=1 USE_PCRE=1 USE_LUA=1 USE_PROMEX=1 | ||
| sudo make -C "${HAPROXY_BUILD_DIR}" install | ||
| rm -rf "${HAPROXY_BUILD_DIR}" | ||
|
Comment on lines
+37
to
+43
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe have a script for this rather than duplicating this everywhere? also, can we get the binaries prebuilt rather than building from source?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Okay with you if we dedupe/incorporate prebuilt binary in a follow-up PR? |
||
|
|
||
| # Create HAProxy directories | ||
| sudo mkdir -p /etc/haproxy /run/haproxy /var/log/haproxy | ||
| sudo chown -R haproxy:haproxy /run/haproxy | ||
|
|
||
| # Allow the ray user to manage HAProxy files without password | ||
| echo "ray ALL=(ALL) NOPASSWD: /bin/cp * /etc/haproxy/*, /bin/touch /etc/haproxy/*, /usr/local/sbin/haproxy*" | sudo tee /etc/sudoers.d/haproxy-ray | ||
|
|
||
| EOF | ||
|
|
||
| RUN <<EOF | ||
| #!/bin/bash | ||
|
|
||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical HAProxy build steps
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we duplicating this here?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we are moving it into all (non-slim) ray images, you need to:
this dockerfile is strictly preserved for installing ray wheel as the last step of image building, nothing else is allowed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, moved to |
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical mod. env var prefix |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -569,12 +569,117 @@ | |
|
|
||
| # The message to return when the replica is healthy. | ||
| HEALTHY_MESSAGE = "success" | ||
| NO_ROUTES_MESSAGE = "Route table is not populated yet." | ||
| NO_REPLICAS_MESSAGE = "No replicas are available yet." | ||
| DRAINING_MESSAGE = "This node is being drained." | ||
|
|
||
| # Feature flag to enable a limited form of direct ingress where ingress applications | ||
| # listen on port 8000 (HTTP) and 9000 (gRPC). No proxies will be started. | ||
| RAY_SERVE_ENABLE_DIRECT_INGRESS = ( | ||
| os.environ.get("RAY_SERVE_ENABLE_DIRECT_INGRESS", "0") == "1" | ||
| ) | ||
|
|
||
| # Feature flag to use HAProxy. | ||
| RAY_SERVE_ENABLE_HA_PROXY = os.environ.get("RAY_SERVE_ENABLE_HA_PROXY", "0") == "1" | ||
|
|
||
| # HAProxy configuration defaults | ||
| # Maximum number of concurrent connections | ||
| RAY_SERVE_HAPROXY_MAXCONN = int(os.environ.get("RAY_SERVE_HAPROXY_MAXCONN", "20000")) | ||
|
|
||
| # Number of threads for HAProxy | ||
| RAY_SERVE_HAPROXY_NBTHREAD = int(os.environ.get("RAY_SERVE_HAPROXY_NBTHREAD", "4")) | ||
|
|
||
| # HAProxy configuration file location | ||
| RAY_SERVE_HAPROXY_CONFIG_FILE_LOC = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_CONFIG_FILE_LOC", "/tmp/haproxy-serve/haproxy.cfg" | ||
| ) | ||
|
|
||
| # HAProxy admin socket path | ||
| RAY_SERVE_HAPROXY_SOCKET_PATH = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_SOCKET_PATH", "/tmp/haproxy-serve/admin.sock" | ||
| ) | ||
|
|
||
| # Enable HAProxy optimized configuration (server state persistence, etc.) | ||
| # Disabled by default to prevent test suite interference | ||
| RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG = ( | ||
| os.environ.get("RAY_SERVE_ENABLE_HAPROXY_OPTIMIZED_CONFIG", "1") == "1" | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment says "disabled by default" but default is enabledMedium Severity
Additional Locations (1) |
||
|
|
||
| # HAProxy server state path | ||
| RAY_SERVE_HAPROXY_SERVER_STATE_BASE = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_SERVER_STATE_BASE", "/tmp/haproxy-serve" | ||
| ) | ||
|
|
||
| # HAProxy server state path | ||
| RAY_SERVE_HAPROXY_SERVER_STATE_FILE = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_SERVER_STATE_FILE", "/tmp/haproxy-serve/server-state" | ||
| ) | ||
|
|
||
| # HAProxy hard stop after timeout | ||
| RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S = int( | ||
| os.environ.get("RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S", "120") | ||
| ) | ||
|
|
||
| # HAProxy metrics export port | ||
| RAY_SERVE_HAPROXY_METRICS_PORT = int( | ||
| os.environ.get("RAY_SERVE_HAPROXY_METRICS_PORT", "9101") | ||
| ) | ||
|
|
||
| # HAProxy log port | ||
| RAY_SERVE_HAPROXY_SYSLOG_PORT = int( | ||
| os.environ.get("RAY_SERVE_HAPROXY_SYSLOG_PORT", "514") | ||
| ) | ||
|
|
||
| # HAProxy timeout configurations (in seconds, None = no timeout) | ||
| RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S = ( | ||
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S")) | ||
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_SERVER_S") | ||
| else None | ||
| ) | ||
|
|
||
| RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S = ( | ||
| int(os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S")) | ||
| if os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CONNECT_S") | ||
| else None | ||
| ) | ||
|
|
||
| # HAProxy timeout client | ||
| RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S = int( | ||
| os.environ.get("RAY_SERVE_HAPROXY_TIMEOUT_CLIENT_S", "3600") | ||
| ) | ||
|
|
||
| # Number of consecutive failed server health checks that must occur | ||
| # before haproxy marks the server as down. | ||
| RAY_SERVE_HAPROXY_HEALTH_CHECK_FALL = int( | ||
| os.environ.get("RAY_SERVE_HAPROXY_HEALTH_CHECK_FALL", "2") | ||
| ) | ||
|
|
||
| # Number of consecutive successful server health checks that must occur | ||
| # before haproxy marks the server as up. | ||
| RAY_SERVE_HAPROXY_HEALTH_CHECK_RISE = int( | ||
| os.environ.get("RAY_SERVE_HAPROXY_HEALTH_CHECK_RISE", "2") | ||
| ) | ||
|
|
||
| # Time interval between each haproxy health check attempt. Also the | ||
| # timeout of each health check before being considered as failed. | ||
| RAY_SERVE_HAPROXY_HEALTH_CHECK_INTER = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_HEALTH_CHECK_INTER", "5s" | ||
| ) | ||
|
|
||
| # Time interval between each haproxy health check attempt when the server is in any of the transition states: UP - transitionally DOWN or DOWN - transitionally UP | ||
| RAY_SERVE_HAPROXY_HEALTH_CHECK_FASTINTER = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_HEALTH_CHECK_FASTINTER", "250ms" | ||
| ) | ||
|
|
||
| # Time interval between each haproxy health check attempt when the server is in the DOWN state | ||
| RAY_SERVE_HAPROXY_HEALTH_CHECK_DOWNINTER = os.environ.get( | ||
| "RAY_SERVE_HAPROXY_HEALTH_CHECK_DOWNINTER", "250ms" | ||
| ) | ||
|
|
||
| # Direct ingress must be enabled if HAProxy is enabled | ||
| if RAY_SERVE_ENABLE_HA_PROXY: | ||
| RAY_SERVE_ENABLE_DIRECT_INGRESS = True | ||
|
|
||
| RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT = int( | ||
| os.environ.get("RAY_SERVE_DIRECT_INGRESS_MIN_HTTP_PORT", "30000") | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| CONTROL_LOOP_INTERVAL_S, | ||
| RAY_SERVE_CONTROLLER_CALLBACK_IMPORT_PATH, | ||
| RAY_SERVE_ENABLE_DIRECT_INGRESS, | ||
| RAY_SERVE_ENABLE_HA_PROXY, | ||
| RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS, | ||
| RECOVERING_LONG_POLL_BROADCAST_TIMEOUT_S, | ||
| SERVE_CONTROLLER_NAME, | ||
|
|
@@ -48,7 +49,10 @@ | |
| from ray.serve._private.controller_health_metrics_tracker import ( | ||
| ControllerHealthMetricsTracker, | ||
| ) | ||
| from ray.serve._private.default_impl import create_cluster_node_info_cache | ||
| from ray.serve._private.default_impl import ( | ||
| create_cluster_node_info_cache, | ||
| get_proxy_actor_class, | ||
| ) | ||
| from ray.serve._private.deployment_info import DeploymentInfo | ||
| from ray.serve._private.deployment_state import ( | ||
| DeploymentReplica, | ||
|
|
@@ -187,8 +191,14 @@ async def __init__( | |
| self.cluster_node_info_cache = create_cluster_node_info_cache(self.gcs_client) | ||
| self.cluster_node_info_cache.update() | ||
|
|
||
| self._ha_proxy_enabled = RAY_SERVE_ENABLE_HA_PROXY | ||
| self._direct_ingress_enabled = RAY_SERVE_ENABLE_DIRECT_INGRESS | ||
| if self._direct_ingress_enabled: | ||
| if self._ha_proxy_enabled: | ||
| logger.info( | ||
| "HAProxy is enabled in ServeController, replacing Serve proxy " | ||
| "with HAProxy." | ||
| ) | ||
| elif self._direct_ingress_enabled: | ||
| logger.info( | ||
| "Direct ingress is enabled in ServeController, enabling proxy " | ||
| "on head node only." | ||
|
|
@@ -203,6 +213,7 @@ async def __init__( | |
| cluster_node_info_cache=self.cluster_node_info_cache, | ||
| logging_config=self.global_logging_config, | ||
| grpc_options=set_proxy_default_grpc_options(grpc_options), | ||
| proxy_actor_class=get_proxy_actor_class(), | ||
| ) | ||
| # We modify the HTTP and gRPC options above, so delete them to avoid | ||
| del http_options, grpc_options | ||
|
|
@@ -275,7 +286,9 @@ async def __init__( | |
| ] = [] | ||
| self._refresh_autoscaling_deployments_cache() | ||
|
|
||
| self._last_broadcasted_target_groups: List[TargetGroup] = [] | ||
| # Initialize to None (not []) to ensure the first broadcast always happens, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical |
||
| # even if target_groups is empty (e.g., route_prefix=None deployments). | ||
| self._last_broadcasted_target_groups: Optional[List[TargetGroup]] = None | ||
|
|
||
| def reconfigure_global_logging_config(self, global_logging_config: LoggingConfig): | ||
| if ( | ||
|
|
@@ -659,6 +672,29 @@ async def run_control_loop_step( | |
| # get all alive replica ids and their node ids. | ||
| NodePortManager.prune(self._get_node_id_to_alive_replica_ids()) | ||
|
|
||
| # HAProxy target group broadcasting | ||
| if self._ha_proxy_enabled: | ||
| self.broadcast_target_groups_if_changed() | ||
|
|
||
| def broadcast_target_groups_if_changed(self) -> None: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical |
||
| """Broadcast target groups over long poll if they have changed. | ||
|
|
||
| Keeps an in-memory record of the last target groups that were broadcast | ||
| to determine if they have changed. | ||
| """ | ||
| target_groups: List[TargetGroup] = self.get_target_groups( | ||
| from_proxy_manager=True, | ||
| ) | ||
|
|
||
| # Check if target groups have changed by comparing the objects directly | ||
| if self._last_broadcasted_target_groups == target_groups: | ||
| return | ||
|
|
||
| self.long_poll_host.notify_changed( | ||
| {LongPollNamespace.TARGET_GROUPS: target_groups} | ||
| ) | ||
| self._last_broadcasted_target_groups = target_groups | ||
|
|
||
| def _create_control_loop_metrics(self): | ||
| self.node_update_duration_gauge_s = metrics.Gauge( | ||
| "serve_controller_node_update_duration_s", | ||
|
|
@@ -1296,9 +1332,16 @@ def get_target_groups( | |
| that have running replicas, we return target groups for direct ingress. | ||
| If there are multiple applications with no running replicas, we return | ||
| one target group per application with unique route prefix. | ||
| 5. HAProxy is enabled and the caller is not an internal proxy manager. In | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical |
||
| this case, we return target groups containing the proxies (e.g. haproxy). | ||
| 6. HAProxy is enabled and the caller is an internal proxy manager (e.g. | ||
| haproxy manager). In this case, we return target groups containing the | ||
| ingress replicas and possibly the Serve proxies. | ||
| """ | ||
| proxy_target_groups = self._get_proxy_target_groups() | ||
| if not self._direct_ingress_enabled: | ||
| if not self._direct_ingress_enabled or ( | ||
| self._ha_proxy_enabled and not from_proxy_manager | ||
| ): | ||
| return proxy_target_groups | ||
|
|
||
| # Get all applications and their metadata | ||
|
|
@@ -1319,6 +1362,10 @@ def get_target_groups( | |
| ] | ||
|
|
||
| if not apps: | ||
| # When HAProxy is enabled and there are no apps, return empty target groups | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. identical |
||
| # so that all requests fall through to the default_backend (404) | ||
| if self._ha_proxy_enabled and from_proxy_manager: | ||
| return [] | ||
| return proxy_target_groups | ||
|
|
||
| # Create target groups for each application | ||
|
|
@@ -1428,7 +1475,7 @@ def _get_target_groups_for_app_with_no_running_replicas( | |
| TargetGroup( | ||
| protocol=RequestProtocol.HTTP, | ||
| route_prefix=route_prefix, | ||
| targets=http_targets, | ||
| targets=[] if self._ha_proxy_enabled else http_targets, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical |
||
| app_name=app_name, | ||
| ) | ||
| ) | ||
|
|
@@ -1437,7 +1484,7 @@ def _get_target_groups_for_app_with_no_running_replicas( | |
| TargetGroup( | ||
| protocol=RequestProtocol.GRPC, | ||
| route_prefix=route_prefix, | ||
| targets=grpc_targets, | ||
| targets=[] if self._ha_proxy_enabled else grpc_targets, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical |
||
| app_name=app_name, | ||
| ) | ||
| ) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical HAProxy build steps