Skip to content

Commit 2452845

Browse files
authored
[iris] Redesign integration test suite (#4009)
Redesigns the Iris integration test suite (`tests/integration/iris/`): - **Full marin pipeline on Iris**: `test_marin_on_iris.py` now runs the real `create_steps()` pipeline from `tests/integration_test.py` via `FrayIrisClient`, replacing the fake JSON-shuffling placeholder. - **No local cluster fallback**: `conftest.py` stripped to ~43 lines. `--controller-url` is always required; no hidden 10-worker local cluster. - **Flat job functions**: `IntegrationJobs` class replaced with top-level functions in `jobs.py`. - **Auth/checkpoint/GPU tests moved to `lib/iris/tests/`**: These create their own `LocalCluster` and don't belong in the generic integration suite. - **CLI dispatch tests**: `test_cli_dispatch.py` exercises `iris job run` via subprocess. - **`is_remote` removed**: No local/remote specialization. All tests assume a real cluster. - **GitHub workflow**: `.github/workflows/iris-integration.yaml` starts a local Iris cluster and runs the full suite including the marin pipeline test.
1 parent c16d601 commit 2452845

14 files changed

Lines changed: 1069 additions & 2 deletions

File tree

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
name: Iris - Integration Tests
2+
3+
on:
4+
push:
5+
branches: [main]
6+
pull_request:
7+
workflow_dispatch:
8+
9+
jobs:
10+
iris-itest:
11+
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository
12+
runs-on: ubuntu-latest
13+
timeout-minutes: 45
14+
concurrency:
15+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
16+
cancel-in-progress: true
17+
18+
strategy:
19+
matrix:
20+
python-version: ["3.12"]
21+
22+
steps:
23+
- name: Checkout code
24+
uses: actions/checkout@v4
25+
26+
- name: Set up Python ${{ matrix.python-version }}
27+
uses: actions/setup-python@v5
28+
with:
29+
python-version: ${{ matrix.python-version }}
30+
31+
- name: Set up Node.js
32+
uses: actions/setup-node@v4
33+
with:
34+
node-version: "22"
35+
36+
- name: Install uv
37+
uses: astral-sh/setup-uv@v7
38+
with:
39+
enable-cache: true
40+
41+
- name: Install dependencies
42+
run: uv sync --all-packages --extra=cpu --extra=dedup --no-default-groups --group dev
43+
44+
- name: Start local Iris cluster
45+
run: |
46+
uv run iris --config lib/iris/examples/test.yaml \
47+
cluster start --local > /tmp/iris-cluster.log 2>&1 &
48+
CLUSTER_PID=$!
49+
echo "CLUSTER_PID=$CLUSTER_PID" >> "$GITHUB_ENV"
50+
51+
# Wait for controller to print its URL
52+
for i in $(seq 1 120); do
53+
if grep -q "Controller started at" /tmp/iris-cluster.log 2>/dev/null; then
54+
URL=$(grep "Controller started at" /tmp/iris-cluster.log | head -1 | sed -n 's/.*Controller started at //p')
55+
echo "IRIS_CONTROLLER_URL=$URL" >> "$GITHUB_ENV"
56+
echo "Cluster ready at $URL"
57+
break
58+
fi
59+
sleep 1
60+
done
61+
62+
if [ -z "${URL:-}" ]; then
63+
echo "Cluster failed to start within timeout"
64+
cat /tmp/iris-cluster.log
65+
exit 1
66+
fi
67+
68+
- name: Run integration tests
69+
run: |
70+
uv run pytest tests/integration/iris/ \
71+
--controller-url "$IRIS_CONTROLLER_URL" \
72+
-v --tb=short --timeout=600 \
73+
-o "addopts=" \
74+
-x
75+
env:
76+
WANDB_MODE: disabled
77+
WANDB_API_KEY: ""
78+
JAX_TRACEBACK_FILTERING: off
79+
80+
- name: Stop cluster
81+
if: always()
82+
run: kill $CLUSTER_PID 2>/dev/null || true
83+
84+
- name: Show cluster logs on failure
85+
if: failure()
86+
run: cat /tmp/iris-cluster.log || true

lib/iris/examples/test.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ controller:
3333
port: 10000
3434

3535
scale_groups:
36-
# Active scale group with capacity
36+
# Active scale group with capacity (non-preemptible so executor-style jobs land here)
3737
tpu_v5e_4:
3838
num_vms: 1
3939
resources:
@@ -43,7 +43,7 @@ scale_groups:
4343
device_type: tpu
4444
device_variant: v5litepod-4
4545
device_count: 4
46-
preemptible: true
46+
preemptible: false
4747
min_slices: 2
4848
max_slices: 10
4949
slice_template:

lib/iris/tests/conftest.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,38 @@
77
import os
88
import subprocess
99
import sys
10+
from pathlib import Path
1011
import threading
1112
import time
1213
import traceback
1314
import warnings
1415

1516
import pytest
17+
from iris.cluster.config import load_config, make_local_config
18+
from iris.rpc import config_pb2
1619
from iris.test_util import SentinelFile
1720
from iris.time_utils import Deadline, Duration
1821

22+
IRIS_ROOT = Path(__file__).resolve().parents[1]
23+
DEFAULT_CONFIG = IRIS_ROOT / "examples" / "test.yaml"
24+
25+
26+
def _make_controller_only_config() -> config_pb2.IrisClusterConfig:
27+
"""Build a local config with no auto-scaled workers."""
28+
config = load_config(DEFAULT_CONFIG)
29+
config.scale_groups.clear()
30+
sg = config.scale_groups["placeholder"]
31+
sg.name = "placeholder"
32+
sg.num_vms = 1
33+
sg.min_slices = 0
34+
sg.max_slices = 0
35+
sg.resources.cpu_millicores = 1000
36+
sg.resources.memory_bytes = 1 * 1024**3
37+
sg.resources.disk_bytes = 10 * 1024**3
38+
sg.resources.device_type = config_pb2.ACCELERATOR_TYPE_CPU
39+
sg.slice_template.local.SetInParent()
40+
return make_local_config(config)
41+
1942

2043
def _docker_image_exists(tag: str) -> bool:
2144
try:

lib/iris/tests/test_auth.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Auth tests for Iris controller with static token authentication."""
4+
5+
import pytest
6+
from iris.cluster.providers.local.cluster import LocalCluster
7+
from iris.cluster.types import Entrypoint, ResourceSpec
8+
from iris.rpc import cluster_pb2
9+
from iris.rpc.cluster_connect import ControllerServiceClientSync
10+
11+
from .conftest import _make_controller_only_config
12+
13+
_AUTH_TOKEN = "e2e-test-token"
14+
_AUTH_USER = "test-user"
15+
16+
17+
def _login_for_jwt(url: str, identity_token: str) -> str:
18+
"""Exchange a raw identity token for a JWT via the Login RPC."""
19+
client = ControllerServiceClientSync(address=url, timeout_ms=10000)
20+
try:
21+
resp = client.login(cluster_pb2.LoginRequest(identity_token=identity_token))
22+
return resp.token
23+
finally:
24+
client.close()
25+
26+
27+
def _quick():
28+
return 1
29+
30+
31+
def test_static_auth_rpc_access():
32+
"""Static auth rejects unauthenticated and wrong-token RPCs, accepts valid JWT."""
33+
from connectrpc.errors import ConnectError
34+
from iris.rpc.auth import AuthTokenInjector, StaticTokenProvider
35+
36+
config = _make_controller_only_config()
37+
config.auth.static.tokens[_AUTH_TOKEN] = _AUTH_USER
38+
controller = LocalCluster(config)
39+
url = controller.start()
40+
41+
try:
42+
list_req = cluster_pb2.Controller.ListWorkersRequest()
43+
44+
unauth_client = ControllerServiceClientSync(address=url, timeout_ms=5000)
45+
with pytest.raises(ConnectError, match=r"(?i)(authorization|authenticat)"):
46+
unauth_client.list_workers(list_req)
47+
unauth_client.close()
48+
49+
wrong_injector = AuthTokenInjector(StaticTokenProvider("wrong-token"))
50+
wrong_client = ControllerServiceClientSync(address=url, timeout_ms=5000, interceptors=[wrong_injector])
51+
with pytest.raises(ConnectError, match=r"(?i)authenticat"):
52+
wrong_client.list_workers(list_req)
53+
wrong_client.close()
54+
55+
jwt_token = _login_for_jwt(url, _AUTH_TOKEN)
56+
valid_injector = AuthTokenInjector(StaticTokenProvider(jwt_token))
57+
valid_client = ControllerServiceClientSync(address=url, timeout_ms=5000, interceptors=[valid_injector])
58+
response = valid_client.list_workers(list_req)
59+
assert response is not None
60+
valid_client.close()
61+
finally:
62+
controller.close()
63+
64+
65+
def test_static_auth_job_ownership():
66+
"""Job ownership: user A cannot terminate user B's job."""
67+
from connectrpc.errors import ConnectError
68+
from iris.rpc.auth import AuthTokenInjector, StaticTokenProvider
69+
70+
_TOKEN_A = "token-user-a"
71+
_TOKEN_B = "token-user-b"
72+
73+
config = _make_controller_only_config()
74+
config.auth.static.tokens[_TOKEN_A] = "user-a"
75+
config.auth.static.tokens[_TOKEN_B] = "user-b"
76+
controller = LocalCluster(config)
77+
url = controller.start()
78+
79+
try:
80+
jwt_a = _login_for_jwt(url, _TOKEN_A)
81+
jwt_b = _login_for_jwt(url, _TOKEN_B)
82+
83+
injector_a = AuthTokenInjector(StaticTokenProvider(jwt_a))
84+
client_a = ControllerServiceClientSync(address=url, timeout_ms=10000, interceptors=[injector_a])
85+
86+
entrypoint = Entrypoint.from_callable(_quick)
87+
launch_req = cluster_pb2.Controller.LaunchJobRequest(
88+
name="/user-a/auth-owned-job",
89+
entrypoint=entrypoint.to_proto(),
90+
resources=ResourceSpec(cpu=1, memory="1g").to_proto(),
91+
)
92+
resp = client_a.launch_job(launch_req)
93+
job_id = resp.job_id
94+
95+
injector_b = AuthTokenInjector(StaticTokenProvider(jwt_b))
96+
client_b = ControllerServiceClientSync(address=url, timeout_ms=10000, interceptors=[injector_b])
97+
with pytest.raises(ConnectError, match="cannot access resources owned by"):
98+
client_b.terminate_job(cluster_pb2.Controller.TerminateJobRequest(job_id=job_id))
99+
100+
client_a.terminate_job(cluster_pb2.Controller.TerminateJobRequest(job_id=job_id))
101+
102+
client_a.close()
103+
client_b.close()
104+
finally:
105+
controller.close()
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Checkpoint/restore test for Iris controller."""
4+
5+
from pathlib import Path
6+
7+
import time
8+
9+
from iris.client.client import IrisClient, Job
10+
from iris.cluster.config import load_config, make_local_config
11+
from iris.cluster.providers.local.cluster import LocalCluster
12+
from iris.cluster.types import Entrypoint, EnvironmentSpec, ResourceSpec, is_job_finished
13+
from iris.rpc import cluster_pb2
14+
from iris.rpc.cluster_connect import ControllerServiceClientSync
15+
16+
IRIS_ROOT = Path(__file__).resolve().parents[1]
17+
DEFAULT_CONFIG = IRIS_ROOT / "examples" / "test.yaml"
18+
19+
20+
def _quick():
21+
return 1
22+
23+
24+
class _IrisTestHelper:
25+
"""Minimal helper to submit and wait for jobs (standalone, no integration fixtures)."""
26+
27+
def __init__(self, url: str, client: IrisClient, controller_client: ControllerServiceClientSync):
28+
self.url = url
29+
self.client = client
30+
self.controller_client = controller_client
31+
32+
def wait_for_workers(self, count: int, timeout: float = 30):
33+
deadline = time.monotonic() + timeout
34+
while time.monotonic() < deadline:
35+
resp = self.controller_client.list_workers(cluster_pb2.Controller.ListWorkersRequest())
36+
healthy = [w for w in resp.workers if w.healthy]
37+
if len(healthy) >= count:
38+
return
39+
time.sleep(0.5)
40+
raise TimeoutError(f"Expected {count} healthy workers, timed out")
41+
42+
def submit(self, fn, name: str) -> Job:
43+
return self.client.submit(
44+
entrypoint=Entrypoint.from_callable(fn),
45+
name=name,
46+
resources=ResourceSpec(cpu=1, memory="1g"),
47+
environment=EnvironmentSpec(),
48+
)
49+
50+
def wait(self, job: Job, timeout: float = 30) -> cluster_pb2.JobStatus:
51+
deadline = time.monotonic() + timeout
52+
while time.monotonic() < deadline:
53+
resp = self.controller_client.get_job_status(
54+
cluster_pb2.Controller.GetJobStatusRequest(job_id=job.job_id.to_wire())
55+
)
56+
if is_job_finished(resp.job.state):
57+
return resp.job
58+
time.sleep(0.5)
59+
raise TimeoutError(f"Job {job.job_id} did not finish within {timeout}s")
60+
61+
62+
def test_checkpoint_restore():
63+
"""Controller restart resumes from checkpoint: completed jobs visible, cluster functional."""
64+
config = load_config(DEFAULT_CONFIG)
65+
config = make_local_config(config)
66+
67+
cluster = LocalCluster(config)
68+
url = cluster.start()
69+
try:
70+
client = IrisClient.remote(url, workspace=IRIS_ROOT)
71+
controller_client = ControllerServiceClientSync(address=url, timeout_ms=30000)
72+
tc = _IrisTestHelper(url=url, client=client, controller_client=controller_client)
73+
tc.wait_for_workers(1, timeout=30)
74+
75+
job = tc.submit(_quick, "pre-restart")
76+
tc.wait(job, timeout=30)
77+
saved_job_id = job.job_id.to_wire()
78+
79+
ckpt = controller_client.begin_checkpoint(cluster_pb2.Controller.BeginCheckpointRequest())
80+
assert ckpt.checkpoint_path, "begin_checkpoint returned empty path"
81+
assert ckpt.job_count >= 1
82+
controller_client.close()
83+
84+
url = cluster.restart()
85+
86+
controller_client = ControllerServiceClientSync(address=url, timeout_ms=30000)
87+
tc = _IrisTestHelper(
88+
url=url, client=IrisClient.remote(url, workspace=IRIS_ROOT), controller_client=controller_client
89+
)
90+
91+
resp = controller_client.get_job_status(cluster_pb2.Controller.GetJobStatusRequest(job_id=saved_job_id))
92+
assert (
93+
resp.job.state == cluster_pb2.JOB_STATE_SUCCEEDED
94+
), f"Pre-restart job has state {resp.job.state} after restore"
95+
96+
tc.wait_for_workers(1, timeout=30)
97+
post_job = tc.submit(_quick, "post-restart")
98+
status = tc.wait(post_job, timeout=30)
99+
assert status.state == cluster_pb2.JOB_STATE_SUCCEEDED
100+
101+
controller_client.close()
102+
finally:
103+
cluster.close()

0 commit comments

Comments
 (0)