Skip to content

Commit 0c2ea1e

Browse files
committed
[iris] Add proxy_stack for off-cluster iris:// resolution via ssh tunnels
The iris:// handler now consults rigging.proxy.active_stack() and lazily opens ssh tunnels via ControllerProvider.tunnel_to() for cluster-internal addresses that fail a TCP reachability probe. Tunnels are cached per remote and torn down at scope exit. Also fixes a pre-existing hostport-parse bug: registered system endpoints carry an http:// scheme that rsplit(":", 1) was mangling. Verified end-to-end against the marin cluster: `iris resolve --proxy 'iris://marin?endpoint=/system/log-server'` opens controller + log-server tunnels and a fetch_logs RPC round-trips through them.
1 parent 7978c58 commit 0c2ea1e

12 files changed

Lines changed: 828 additions & 52 deletions

File tree

.agents/projects/marin-log-store.md

Lines changed: 143 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,134 @@ Backoff/retry semantics on `LogPusher` are unchanged from today — push
419419
failures already retry with backoff. The new behavior is purely "evict
420420
cached `(host, port)` after a transport error" inside `RemoteLogClient`.
421421

422+
### D13. Off-cluster access ships with finelog: per-service tunnels + auto-probe in the resolver
423+
424+
The architecture-doc promise is *"lifting logging out doesn't require
425+
rewriting callers."* That promise fails if `LogClient.connect("gcp://finelog-server")`
426+
only works on-cluster — every off-cluster caller still needs the
427+
controller's `FetchLogs` proxy, the controller stays load-bearing for
428+
log traffic, and Phase 5.5/6 proxy deletion never lands. So we build
429+
the off-cluster path here.
430+
431+
Three mechanisms considered, two ruled out:
432+
433+
**1. SOCKS via `ssh -D` — not viable today.** The architecture-doc
434+
recommendation assumed our Connect/RPC stack is built on `httpx`. It is
435+
not. `connectrpc.client_sync.ConnectClientSync` uses `pyqwest.SyncClient`
436+
(reqwest under the hood). Verified: `pyqwest.SyncHTTPTransport.__init__`
437+
exposes no proxy parameter, and setting `ALL_PROXY=socks5://...` yields
438+
`ConnectionError: client error (Connect): unsupported scheme socks5` —
439+
pyqwest builds reqwest without the `socks` Cargo feature, so the scheme
440+
isn't compiled in. End-to-end test against the live marin cluster:
441+
- `gcloud compute ssh iris-controller-marin -- -D 127.0.0.1:11080 -N` came
442+
up and `curl --socks5-hostname` round-tripped fine for raw HTTP.
443+
- The actual iris client stack rejected the same proxy.
444+
445+
**2. Per-service `ssh -L` / `kubectl port-forward` with auto-probe — the
446+
chosen path, built in this project as a new phase.** Shape:
447+
448+
```python
449+
# rigging.resolver — small additions.
450+
_active_session: ContextVar[ProxySession | None] = ContextVar("proxy", default=None)
451+
452+
def resolve(ref: str) -> tuple[str, int]:
453+
addr = _resolve_raw(ref)
454+
session = _active_session.get()
455+
if session is None:
456+
return addr
457+
if _is_reachable(addr, timeout=0.5): # one TCP connect, short timeout
458+
return addr
459+
return session.proxy_address(addr) # opens a tunnel lazily, caches it
460+
461+
# iris.client.maybe_proxy — the session.
462+
class ProxySession:
463+
def __init__(self, cluster: str):
464+
self._bundle = load_cluster_config(cluster).provider_bundle()
465+
self._stack = ExitStack()
466+
self._tunnels: dict[tuple[str, int], tuple[str, int]] = {}
467+
468+
def proxy_address(self, remote):
469+
if remote in self._tunnels: return self._tunnels[remote]
470+
local = self._stack.enter_context(self._bundle.controller.tunnel_to(*remote))
471+
self._tunnels[remote] = local
472+
return local
473+
474+
@contextmanager
475+
def maybe_proxy(cluster: str) -> Iterator[None]:
476+
session = ProxySession(cluster)
477+
token = _active_session.set(session)
478+
try:
479+
with session._stack:
480+
yield
481+
finally:
482+
_active_session.reset(token)
483+
```
484+
485+
Properties:
486+
487+
- **Caller writes one path of code.** `LogClient.connect("gcp://finelog-server")`
488+
works on-cluster (probe succeeds, no proxy) and off-cluster inside
489+
`maybe_proxy(cluster)` (probe fails, tunnel opens, localhost address
490+
returned). The architecture doc's concern about "hidden auto-detection
491+
surprising on partial-cloud-access laptops" is handled correctly by
492+
per-URL probing — only unreachable targets get proxied.
493+
- **Probe cost amortizes via D11.** First resolve pays the probe; cached
494+
`(host, port)` is reused until D11 evicts on transport failure.
495+
- **Caller still opts in.** Outside `maybe_proxy(cluster)` the resolver
496+
returns the unreachable address verbatim — no surprises in production
497+
code.
498+
- **Lifetime: rely on context-manager-as-GC.** Tunnels live for the
499+
duration of the `maybe_proxy` block. The resolver returns plain
500+
`(host, port)` tuples that escape the session; we cannot ref-count or
501+
TTL them. Cleanup happens at `__exit__`. Implication: `maybe_proxy` is
502+
meant to bracket a logical task (CLI invocation, batch run), not be
503+
held open forever. Long-lived off-cluster daemons either re-enter per
504+
batch or accept that tunnels accumulate (bounded by distinct service
505+
count, small in practice).
506+
507+
What this requires:
508+
509+
- `ControllerProvider.tunnel_to(host, port) -> ContextManager[tuple[str, int]]`
510+
generalizes the existing `tunnel(controller_addr)` to arbitrary
511+
internal targets. GCP threads `target_host`/`target_port` through the
512+
existing `_gcp_tunnel` helpers; one `gcloud compute ssh -L` per
513+
service, no ControlMaster optimization in v1 (one ssh handshake per
514+
service is acceptable for off-cluster CLI use). K8s wraps
515+
`kubectl port-forward svc/<name>`; pod-IP form is a followup.
516+
- `_is_reachable(host, port, timeout)` in `rigging.resolver` — one TCP
517+
connect, 0.5s default timeout.
518+
- `iris.client.maybe_proxy` module + tests.
519+
- End-to-end test against the marin cluster.
520+
521+
Honest size: ~180–200 LOC across rigging + iris, plus tests.
522+
523+
**3. Cloudflare Zero Trust — the medium-term destination.** Per the
524+
architecture-doc cost analysis, the free tier covers our user count
525+
(<50). Once enrolled, laptops, CI runners, and cluster VMs share a
526+
private network; `maybe_proxy` becomes a no-op everywhere; internal
527+
addresses just work; no per-service tunneling code, no probe latency,
528+
no transport compatibility matrix. **Likely the right place to put
529+
engineering effort.** Tailscale Standard (~$2.4k/yr) is the alternative
530+
once we exceed the free tier.
531+
532+
**Rejected options:**
533+
534+
- **Controller-side proxy** (extend the existing `FetchLogs` forwarder to
535+
cover all log-server traffic): undoes a chunk of the extraction's
536+
value. The whole point is "log traffic doesn't go through the
537+
controller." Off-cluster routing through the controller puts it back
538+
on the critical path for those callers.
539+
- **Replace pyqwest with httpx** to enable SOCKS: reasonable but not
540+
scoped to this project.
541+
542+
**Scope:** Option (2) ships in this project as a new phase
543+
(see Phase 7.5). Option (3) is the post-finelog destination — once
544+
Cloudflare Zero Trust is enrolled, `maybe_proxy` becomes a no-op
545+
everywhere and the per-service tunnel code becomes dormant. We accept
546+
that some of (2)'s code may eventually be deleted; the alternative
547+
(ship finelog without an off-cluster path) breaks the architecture
548+
doc's promise on day one.
549+
422550
### D12. Restart-independence is a prod-config guarantee, not a dev-config one
423551

424552
The goal "controller restarts do not restart the log server" holds **only**
@@ -1138,16 +1266,16 @@ These are all followups the architecture doc already calls out.
11381266
the entire fleet of workers re-resolves at roughly the same time.
11391267
Compute Engine API quotas are generous, but worth confirming under
11401268
our worst-case fleet size.
1141-
3. **Off-cluster CLI access**`iris://` resolution itself works from a
1142-
laptop (load YAML, call `discover_controller`), but the resolved
1143-
address is cluster-internal. Today's CLI handles this via
1144-
`bundle.controller.tunnel(addr)` (`iris/cli/main.py:117`). For
1145-
`gcp://` system-service URLs there is no equivalent, so a laptop
1146-
running `LogClient.connect("gcp://finelog-server")` would fail at
1147-
transport time. The architecture doc's `maybe_proxy()` is the right
1148-
place to solve this; until it ships, off-cluster `iris logs` should
1149-
route through the controller's `FetchLogs` proxy or open an explicit
1150-
tunnel.
1269+
3. **Off-cluster CLI access**see D13. Resolution works from a laptop
1270+
(load YAML, call `discover_controller`), but the resolved address is
1271+
cluster-internal. Today's CLI handles `iris://` via
1272+
`bundle.controller.tunnel(addr)` (`iris/cli/main.py:117`). System
1273+
services (`gcp://...`) have no equivalent today; a laptop running
1274+
`LogClient.connect("gcp://finelog-server")` fails at transport time.
1275+
Three paths considered (SOCKS, per-service `ssh -L` with probe
1276+
auto-detection, Cloudflare Zero Trust) — none implemented in this
1277+
project. Until then, off-cluster `iris logs` should route through
1278+
the controller's `FetchLogs` proxy.
11511279

11521280
---
11531281

@@ -1178,6 +1306,10 @@ These are all followups the architecture doc already calls out.
11781306
`lib/iris/AGENTS.md`, `lib/iris/OPS.md`, cluster-config reference.
11791307
- [ ] Followup issue filed: CoreWeave/k8s VM-address helpers
11801308
(`coreweave_vm_address`, `k8s_vm_address`).
1181-
- [ ] Followup issue filed: `maybe_proxy` off-cluster tunnel integration.
1309+
- [ ] Phase 7.5 (off-cluster access, D13): `maybe_proxy(cluster)` with
1310+
per-service `ssh -L` / `kubectl port-forward` + reachability probe
1311+
in the resolver. End-to-end test against marin.
1312+
- [ ] Followup project filed: Cloudflare Zero Trust enrollment (free
1313+
tier covers <50 users); makes `maybe_proxy` a no-op.
11821314
- [ ] Followup issue filed: declarative reconciler for `system_services`
11831315
(auto-provisioning of the named VMs).

lib/iris/src/iris/cli/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ def budget_list(ctx):
424424
from iris.cli.actor import actor as actor_cmd # noqa: E402
425425
from iris.cli.process_status import register_process_status_commands # noqa: E402
426426
from iris.cli.query import query_cmd # noqa: E402
427+
from iris.cli.resolve import resolve_cmd # noqa: E402
427428
from iris.cli.rpc import register_rpc_commands # noqa: E402
428429
from iris.cli.task import task # noqa: E402
429430

@@ -433,5 +434,6 @@ def budget_list(ctx):
433434
iris.add_command(job)
434435
iris.add_command(task)
435436
iris.add_command(query_cmd)
437+
iris.add_command(resolve_cmd)
436438
register_rpc_commands(iris)
437439
register_process_status_commands(iris)

lib/iris/src/iris/cli/resolve.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""``iris resolve`` — resolve a service URL to ``host:port``.
5+
6+
Useful for verifying scheme handlers (``iris://``, ``gcp://``) and the
7+
off-cluster :func:`rigging.proxy.proxy_stack` flow end-to-end.
8+
9+
Examples::
10+
11+
iris resolve gcp://log-server
12+
iris resolve iris://marin?endpoint=/system/log-server --proxy
13+
"""
14+
15+
import logging
16+
17+
import click
18+
19+
import iris.client # noqa: F401 -- registers iris://
20+
from rigging.proxy import proxy_stack
21+
from rigging.resolver import resolve as resolve_url
22+
23+
24+
@click.command("resolve")
25+
@click.argument("url")
26+
@click.option(
27+
"--proxy/--no-proxy",
28+
default=False,
29+
help="Wrap resolution in proxy_stack so unreachable internal addresses are tunneled.",
30+
)
31+
@click.option(
32+
"-v",
33+
"--verbose",
34+
is_flag=True,
35+
default=False,
36+
help="Log INFO-level diagnostics from the resolver and tunnel layer.",
37+
)
38+
def resolve_cmd(url: str, proxy: bool, verbose: bool) -> None:
39+
"""Resolve URL to host:port. With --proxy, opens tunnels for unreachable internal addresses."""
40+
if verbose:
41+
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")
42+
43+
if proxy:
44+
with proxy_stack():
45+
host, port = resolve_url(url)
46+
else:
47+
host, port = resolve_url(url)
48+
click.echo(f"{host}:{port}")

lib/iris/src/iris/client/resolver_plugin.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,20 @@
1010
(``K8sControllerProvider`` returns a Kubernetes Service DNS name), and
1111
Manual/Local clusters (returns the static address from config).
1212
13-
Off-cluster callers (a laptop without direct access to the controller's
14-
internal address) wrap their use of the resolver in
15-
``bundle.controller.tunnel(addr)`` — the existing pattern at
16-
``iris.cluster.config:1095``. The architecture doc's ``maybe_proxy()`` is
17-
the planned ergonomic wrapper for that case.
13+
Off-cluster callers wrap their use of :func:`rigging.resolver.resolve`
14+
in :func:`rigging.proxy.proxy_stack`. When that scope is active and the
15+
controller or returned endpoint is not directly reachable, the handler
16+
opens an ssh tunnel via ``bundle.controller.tunnel_to(...)`` and caches
17+
it on the active stack for the duration of the block.
1818
"""
1919

20+
from urllib.parse import urlsplit
21+
22+
from rigging.proxy import ProxyStack, active_stack, is_reachable
2023
from rigging.resolver import ServiceURL, register_scheme
2124

2225
from iris.cluster.config import load_cluster_config
26+
from iris.cluster.providers.factory import ProviderBundle
2327
from iris.rpc.controller_connect import ControllerServiceClientSync
2428
from iris.rpc.controller_pb2 import Controller as _Controller
2529

@@ -34,14 +38,40 @@ def _resolve_iris(url: ServiceURL) -> tuple[str, int]:
3438

3539
cluster_config = load_cluster_config(cluster)
3640
bundle = cluster_config.provider_bundle()
37-
controller_addr = bundle.controller.discover_controller(cluster_config.proto.controller)
41+
stack = active_stack()
42+
43+
controller_addr = _split_hostport(bundle.controller.discover_controller(cluster_config.proto.controller))
44+
controller_addr = _maybe_tunnel(bundle, controller_addr, stack)
3845

39-
with ControllerServiceClientSync(address=f"http://{controller_addr}") as client:
46+
grpc_target = f"http://{controller_addr[0]}:{controller_addr[1]}"
47+
with ControllerServiceClientSync(address=grpc_target) as client:
4048
response = client.list_endpoints(_Controller.ListEndpointsRequest(prefix=name, exact=True))
4149
if not response.endpoints:
4250
raise KeyError(f"iris endpoint not found: {name!r} on cluster {cluster!r}")
43-
host, port = response.endpoints[0].address.rsplit(":", 1)
51+
52+
log_addr = _split_hostport(response.endpoints[0].address)
53+
return _maybe_tunnel(bundle, log_addr, stack)
54+
55+
56+
def _split_hostport(addr: str) -> tuple[str, int]:
57+
"""Split ``host:port`` or ``scheme://host:port`` into ``(host, port)``."""
58+
if "://" in addr:
59+
parts = urlsplit(addr)
60+
if not parts.hostname or parts.port is None:
61+
raise ValueError(f"address {addr!r} missing host or port")
62+
return parts.hostname, parts.port
63+
host, port = addr.rsplit(":", 1)
4464
return host, int(port)
4565

4666

67+
def _maybe_tunnel(
68+
bundle: ProviderBundle,
69+
addr: tuple[str, int],
70+
stack: ProxyStack | None,
71+
) -> tuple[str, int]:
72+
if stack is None or is_reachable(*addr):
73+
return addr
74+
return stack.proxy(addr, lambda: bundle.controller.tunnel_to(*addr))
75+
76+
4777
register_scheme("iris", _resolve_iris)

0 commit comments

Comments
 (0)