Skip to content

Commit 7af74fd

Browse files
ravwojdylagithub-actions[bot]
authored andcommitted
feat(iris): add actor proxy service for external access to cluster actors (#4126)
Adds an actor proxy route on the controller HTTP server that forwards ActorService RPCs to actors on worker VMs. External clients use ProxyResolver to route calls through the controller instead of connecting directly to internal IPs. Closes #4109 Generated with [Claude Code](https://claude.ai/code) --------- Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Rafal Wojdyla <ravwojdyla@users.noreply.github.com>
1 parent b196a9f commit 7af74fd

6 files changed

Lines changed: 359 additions & 8 deletions

File tree

lib/iris/src/iris/actor/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,25 @@
1010
from iris.actor.client import ActorClient
1111
from iris.actor.pool import ActorPool, BroadcastFuture, CallResult
1212
from iris.actor.resolver import (
13+
ACTOR_ENDPOINT_HEADER,
1314
FixedResolver,
15+
ProxyResolver,
1416
ResolvedEndpoint,
1517
ResolveResult,
1618
Resolver,
1719
)
1820
from iris.actor.server import ActorId, ActorServer
1921

2022
__all__ = [
23+
"ACTOR_ENDPOINT_HEADER",
2124
"ActorClient",
2225
"ActorId",
2326
"ActorPool",
2427
"ActorServer",
2528
"BroadcastFuture",
2629
"CallResult",
2730
"FixedResolver",
31+
"ProxyResolver",
2832
"ResolveResult",
2933
"ResolvedEndpoint",
3034
"Resolver",

lib/iris/src/iris/actor/client.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def __init__(
8080
self._backoff = backoff
8181

8282
self._rpc_client: ActorServiceClientSync | None = None
83+
self._rpc_headers: dict[str, str] = {}
8384

8485
def rpc_client(self) -> ActorServiceClientSync:
8586
"""Resolve actor name to an RPC client (single attempt).
@@ -111,17 +112,19 @@ def rpc_client(self) -> ActorServiceClientSync:
111112
self._name,
112113
len(result.endpoints),
113114
)
114-
logger.info("First endpoint: %s", result.first())
115-
url = result.first().url
115+
endpoint = result.first()
116+
logger.info("First endpoint: %s", endpoint)
117+
self._rpc_headers = dict(endpoint.metadata)
116118
self._rpc_client = ActorServiceClientSync(
117-
address=url,
119+
address=endpoint.url,
118120
timeout_ms=None if self._call_timeout is None else int(self._call_timeout * 1000),
119121
accept_compression=[],
120122
)
121123
return self._rpc_client
122124

123125
def _clear_connection(self, _exc: Exception) -> None:
124126
self._rpc_client = None
127+
self._rpc_headers = {}
125128

126129
def start_operation(self, method_name: str, *args: Any, **kwargs: Any) -> str:
127130
"""Start a long-running operation. Returns the operation ID."""
@@ -134,7 +137,7 @@ def start_operation(self, method_name: str, *args: Any, **kwargs: Any) -> str:
134137

135138
def do_call():
136139
client = self.rpc_client()
137-
return client.start_operation(call)
140+
return client.start_operation(call, headers=self._rpc_headers)
138141

139142
op = call_with_retry(
140143
f"{self._name}.start_operation({method_name})",
@@ -150,7 +153,7 @@ def poll_operation_status(self, operation_id: str) -> actor_pb2.Operation:
150153
req = actor_pb2.OperationId(operation_id=operation_id)
151154

152155
def do_call():
153-
return self.rpc_client().get_operation(req)
156+
return self.rpc_client().get_operation(req, headers=self._rpc_headers)
154157

155158
return call_with_retry(
156159
f"{self._name}.poll_operation_status({operation_id[:8]})",
@@ -179,7 +182,7 @@ def cancel_operation(self, operation_id: str) -> actor_pb2.Operation:
179182
req = actor_pb2.OperationId(operation_id=operation_id)
180183

181184
def do_call():
182-
return self.rpc_client().cancel_operation(req)
185+
return self.rpc_client().cancel_operation(req, headers=self._rpc_headers)
183186

184187
return call_with_retry(
185188
f"{self._name}.cancel_operation({operation_id[:8]})",
@@ -208,7 +211,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any:
208211

209212
def do_call():
210213
client = self._client.rpc_client()
211-
resp = client.call(call)
214+
resp = client.call(call, headers=self._client._rpc_headers)
212215
return unwrap_actor_response(resp)
213216

214217
return call_with_retry(

lib/iris/src/iris/actor/resolver.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
from dataclasses import dataclass, field
77
from typing import Protocol
88

9+
# Header used by ActorProxy to route requests to the correct actor endpoint.
10+
# Shared constant between ProxyResolver (client-side) and ActorProxy (server-side).
11+
ACTOR_ENDPOINT_HEADER = "x-iris-actor-endpoint"
12+
913

1014
@dataclass
1115
class ResolvedEndpoint:
@@ -72,3 +76,33 @@ def resolve(self, name: str) -> ResolveResult:
7276
urls = self._endpoints.get(name, [])
7377
endpoints = [ResolvedEndpoint(url=url, actor_id=f"fixed-{name}-{i}") for i, url in enumerate(urls)]
7478
return ResolveResult(name=name, endpoints=endpoints)
79+
80+
81+
class ProxyResolver:
82+
"""Resolver that routes actor calls through the controller's actor proxy.
83+
84+
Instead of resolving to the actor's direct address, returns the controller
85+
URL so all RPCs go through the proxy. The proxy uses the
86+
``X-Iris-Actor-Endpoint`` header to resolve the actual actor endpoint.
87+
88+
Args:
89+
controller_url: Controller URL (e.g., ``http://localhost:8080``)
90+
namespace: Namespace prefix for endpoint resolution
91+
"""
92+
93+
def __init__(self, controller_url: str, namespace: str):
94+
self._controller_url = controller_url.rstrip("/")
95+
self._namespace = namespace
96+
97+
def resolve(self, name: str) -> ResolveResult:
98+
endpoint_name = f"{self._namespace}/{name}"
99+
return ResolveResult(
100+
name=name,
101+
endpoints=[
102+
ResolvedEndpoint(
103+
url=self._controller_url,
104+
actor_id=f"proxy-{endpoint_name}",
105+
metadata={ACTOR_ENDPOINT_HEADER: endpoint_name},
106+
)
107+
],
108+
)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Actor proxy for forwarding ActorService RPCs to actors within the cluster.
5+
6+
External clients send actor calls to the controller; the proxy resolves the
7+
target endpoint from the controller's DB and forwards the raw request to the
8+
actor server on the worker VM. All ActorService methods are proxied
9+
transparently (raw byte forwarding, no deserialization).
10+
11+
Route pattern::
12+
13+
POST /iris.actor.ActorService/{method}
14+
X-Iris-Actor-Endpoint: namespace/actor-name
15+
"""
16+
17+
import logging
18+
19+
import httpx
20+
from starlette.requests import Request
21+
from starlette.responses import JSONResponse, Response
22+
23+
from iris.cluster.controller.db import ControllerDB, EndpointQuery, endpoint_query_predicate
24+
25+
logger = logging.getLogger(__name__)
26+
27+
# Header used by ProxyResolver to tell the proxy which endpoint to forward to.
28+
# Duplicated from iris.actor.resolver.ACTOR_ENDPOINT_HEADER to avoid a
29+
# cluster → actor import dependency.
30+
ACTOR_ENDPOINT_HEADER = "x-iris-actor-endpoint"
31+
32+
PROXY_ROUTE = "/iris.actor.ActorService/{method}"
33+
PROXY_TIMEOUT_SECONDS = 60.0
34+
35+
# Headers that should not be forwarded to upstream (hop-by-hop or routing-specific).
36+
_HOP_BY_HOP_HEADERS = frozenset(
37+
{
38+
"host",
39+
"transfer-encoding",
40+
"connection",
41+
"keep-alive",
42+
"upgrade",
43+
ACTOR_ENDPOINT_HEADER,
44+
}
45+
)
46+
47+
48+
class ActorProxy:
49+
"""Forwards ActorService RPCs to actors resolved from the endpoint registry."""
50+
51+
def __init__(self, db: ControllerDB):
52+
self._db = db
53+
self._client = httpx.AsyncClient(timeout=PROXY_TIMEOUT_SECONDS)
54+
55+
async def close(self) -> None:
56+
await self._client.aclose()
57+
58+
async def handle(self, request: Request) -> Response:
59+
"""Proxy an ActorService RPC to the resolved actor endpoint."""
60+
method = request.path_params["method"]
61+
endpoint_name = request.headers.get(ACTOR_ENDPOINT_HEADER)
62+
if not endpoint_name:
63+
return JSONResponse(
64+
{"error": f"Missing {ACTOR_ENDPOINT_HEADER} header"},
65+
status_code=400,
66+
)
67+
68+
address = self._resolve_endpoint(endpoint_name)
69+
if address is None:
70+
return JSONResponse(
71+
{"error": f"No endpoint found for '{endpoint_name}'"},
72+
status_code=404,
73+
)
74+
75+
upstream_url = f"http://{address}/iris.actor.ActorService/{method}"
76+
body = await request.body()
77+
forward_headers = {k: v for k, v in request.headers.items() if k.lower() not in _HOP_BY_HOP_HEADERS}
78+
79+
try:
80+
upstream_resp = await self._client.post(
81+
upstream_url,
82+
content=body,
83+
headers=forward_headers,
84+
)
85+
except httpx.HTTPError as exc:
86+
logger.warning("Proxy upstream error for %s: %s", endpoint_name, exc)
87+
return JSONResponse(
88+
{"error": f"Upstream error: {exc}"},
89+
status_code=502,
90+
)
91+
92+
return Response(
93+
content=upstream_resp.content,
94+
status_code=upstream_resp.status_code,
95+
media_type=upstream_resp.headers.get("content-type"),
96+
)
97+
98+
def _resolve_endpoint(self, name: str) -> str | None:
99+
"""Resolve an endpoint name to an address via the controller DB."""
100+
query = EndpointQuery(exact_name=name)
101+
joins, where = endpoint_query_predicate(query)
102+
from iris.cluster.controller.service import ENDPOINTS
103+
104+
with self._db.read_snapshot() as q:
105+
endpoints = q.select(ENDPOINTS, where=where, joins=joins)
106+
if not endpoints:
107+
return None
108+
return endpoints[0].address

lib/iris/src/iris/cluster/controller/dashboard.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from starlette.routing import Mount, Route
3838
from starlette.types import ASGIApp, Receive, Scope, Send
3939

40+
from iris.cluster.controller.actor_proxy import PROXY_ROUTE, ActorProxy
4041
from iris.cluster.controller.service import ControllerServiceImpl
4142
from iris.cluster.dashboard_common import html_shell, static_files_mount
4243
from iris.rpc.auth import SESSION_COOKIE, NullAuthInterceptor, TokenVerifier, extract_bearer_token, resolve_auth
@@ -260,6 +261,12 @@ def _create_app(self) -> ASGIApp:
260261
rpc_wsgi_app = ControllerServiceWSGIApplication(service=self._service, interceptors=interceptors)
261262
rpc_app = WSGIMiddleware(rpc_wsgi_app)
262263

264+
self._actor_proxy = ActorProxy(self._service._db)
265+
266+
@requires_auth
267+
async def _proxy_actor_rpc(request: Request) -> Response:
268+
return await self._actor_proxy.handle(request)
269+
263270
routes = [
264271
Route("/", self._dashboard),
265272
Route("/auth/session_bootstrap", self._session_bootstrap),
@@ -270,10 +277,14 @@ def _create_app(self) -> ASGIApp:
270277
Route("/worker/{worker_id:path}", self._worker_detail_page),
271278
Route("/bundles/{bundle_id:str}.zip", self._bundle_download),
272279
Route("/health", self._health),
280+
Route(PROXY_ROUTE, _proxy_actor_rpc, methods=["POST"]),
273281
Mount(rpc_wsgi_app.path, app=rpc_app),
274282
static_files_mount(),
275283
]
276-
app: Starlette | _RouteAuthMiddleware = Starlette(routes=routes)
284+
app: Starlette | _RouteAuthMiddleware = Starlette(
285+
routes=routes,
286+
on_shutdown=[self._actor_proxy.close],
287+
)
277288
if self._auth_verifier is not None and self._auth_provider is not None:
278289
app = _RouteAuthMiddleware(app, self._auth_verifier, optional=self._auth_optional)
279290
return app

0 commit comments

Comments
 (0)