Skip to content

Commit 2f7e90b

Browse files
ravwojdyla-agentravwojdylaclaude
authored
feat(iris): route actor RPC calls through proxy and decode responses (#4161)
## Summary - `iris rpc actor call --actor-name <full-name> --method-name <method>` now automatically sets the `x-iris-actor-endpoint` header, routing the call through the controller's actor proxy to the correct actor server - ActorResponse `serialized_value` is auto-unpickled so the CLI prints human-readable JSON instead of opaque base64 ### Example ```bash uv run iris --config lib/iris/examples/marin-dev.yaml rpc actor call \ --actor-name "/rav/my-job/zephyr-coord-0" \ --method-name get_counters # Output: { "minhash/documents": 7849813, "minhash/buckets": 204095138 } ``` ## Test plan - [x] Tested live against a running zephyr dedup job on marin-dev - [ ] Add unit test for `_format_actor_response` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Rafal Wojdyla <ravwojdyla@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6266daa commit 2f7e90b

File tree

3 files changed

+72
-1
lines changed

3 files changed

+72
-1
lines changed

lib/iris/src/iris/actor/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def rpc_client(self) -> ActorServiceClientSync:
113113
len(result.endpoints),
114114
)
115115
endpoint = result.first()
116-
logger.info("First endpoint: %s", endpoint)
116+
logger.info("First endpoint: url=%s, actor_id=%s", endpoint.url, endpoint.actor_id)
117117
self._rpc_headers = dict(endpoint.metadata)
118118
self._rpc_client = ActorServiceClientSync(
119119
address=endpoint.url,

lib/iris/src/iris/cli/actor.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""CLI for calling actor methods through the controller proxy."""
5+
6+
import json
7+
8+
import click
9+
10+
from iris.actor import ActorClient, ProxyResolver
11+
from iris.actor.resolver import ResolveResult
12+
from iris.cli.main import require_controller_url
13+
from iris.rpc.auth import GcpAccessTokenProvider, TokenProvider
14+
15+
16+
class _AuthProxyResolver(ProxyResolver):
17+
"""ProxyResolver that injects a bearer token for dashboard auth."""
18+
19+
def __init__(self, controller_url: str, token_provider: TokenProvider):
20+
super().__init__(controller_url)
21+
self._token_provider = token_provider
22+
23+
def resolve(self, name: str) -> ResolveResult:
24+
result = super().resolve(name)
25+
token = self._token_provider.get_token()
26+
if token:
27+
for ep in result.endpoints:
28+
ep.metadata["authorization"] = f"Bearer {token}"
29+
return result
30+
31+
32+
@click.group()
33+
def actor():
34+
"""Interact with actors via the controller proxy."""
35+
pass
36+
37+
38+
@actor.command()
39+
@click.argument("endpoint")
40+
@click.argument("method")
41+
@click.argument("kwargs", required=False, default=None)
42+
@click.option("--timeout", type=float, default=30.0, help="RPC timeout in seconds")
43+
@click.pass_context
44+
def call(ctx: click.Context, endpoint: str, method: str, kwargs: str | None, timeout: float):
45+
"""Call an actor method through the controller proxy.
46+
47+
ENDPOINT is the full actor name as registered in the endpoint registry
48+
(e.g. /user/job/coordinator/actor-0).
49+
50+
METHOD is the method name to call (e.g. get_counters).
51+
52+
KWARGS is an optional JSON object of keyword arguments (e.g. '{"worker_id": "w-3"}').
53+
"""
54+
controller_url = require_controller_url(ctx)
55+
tp = ctx.obj.get("token_provider") if ctx.obj else None
56+
if tp is None:
57+
tp = GcpAccessTokenProvider()
58+
59+
resolver = _AuthProxyResolver(controller_url, tp)
60+
client = ActorClient(resolver, endpoint, call_timeout=timeout)
61+
62+
parsed_kwargs = json.loads(kwargs) if kwargs else {}
63+
rpc_method = getattr(client, method)
64+
result = rpc_method(**parsed_kwargs)
65+
66+
try:
67+
click.echo(json.dumps(result, indent=2, default=str))
68+
except (TypeError, ValueError):
69+
click.echo(repr(result))

lib/iris/src/iris/cli/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,13 @@ def key_revoke(ctx, key_id: str):
337337
from iris.cli.build import build # noqa: E402
338338
from iris.cli.cluster import cluster # noqa: E402
339339
from iris.cli.job import job # noqa: E402
340+
from iris.cli.actor import actor as actor_cmd # noqa: E402
340341
from iris.cli.process_status import register_process_status_commands # noqa: E402
341342
from iris.cli.query import query_cmd # noqa: E402
342343
from iris.cli.rpc import register_rpc_commands # noqa: E402
343344
from iris.cli.task import task # noqa: E402
344345

346+
iris.add_command(actor_cmd)
345347
iris.add_command(cluster)
346348
iris.add_command(build)
347349
iris.add_command(job)

0 commit comments

Comments
 (0)