Skip to content

Commit e03ff4f

Browse files
committed
fix: submit executor as Iris job so child jobs inherit S3 env vars
The marin-on-iris integration test was running executor_main in-process on the CI runner. Child Zephyr jobs on remote pods had no S3 credentials because Iris env propagation only works parent→child, and there was no parent Iris job. Submit the whole executor as an Iris job with S3 env vars so Zephyr coordinator/workers inherit them automatically. Also enable live log output (-s --log-cli-level=INFO) for the local iris integration test workflow.
1 parent eb2f779 commit e03ff4f

2 files changed

Lines changed: 45 additions & 11 deletions

File tree

.github/workflows/iris-integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ jobs:
6969
run: |
7070
uv run pytest tests/integration/iris/ \
7171
--controller-url "$IRIS_CONTROLLER_URL" \
72-
-v --tb=short --timeout=600 \
72+
-v -s --log-cli-level=INFO --tb=short --timeout=600 \
7373
-o "addopts=" \
7474
-x
7575
env:

tests/integration/iris/test_marin_on_iris.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
cluster via FrayIrisClient instead of Ray.
88
99
When MARIN_CI_S3_PREFIX is set (e.g. on CoreWeave CI), all paths use S3 so
10-
remote Zephyr pods can access them. Otherwise falls back to a local tmpdir
11-
(works when the Iris cluster is local / in-process).
10+
remote Zephyr pods can access them. The executor is submitted as an Iris job
11+
so that child jobs (Zephyr coordinator/workers) inherit env vars — including
12+
S3 credentials — via Iris auto-propagation.
13+
14+
Otherwise falls back to running in-process with a local tmpdir (works when
15+
the Iris cluster is local / in-process).
1216
"""
1317

1418
import logging
@@ -22,6 +26,7 @@
2226
import pytest
2327
from fray import set_current_client
2428
from fray.v2.iris_backend import FrayIrisClient
29+
from fray.v2.types import Entrypoint, JobRequest, ResourceConfig, create_environment
2530
from marin.execution.executor import ExecutorMainConfig, executor_main
2631
from tests.integration_test import create_steps
2732

@@ -32,6 +37,9 @@
3237

3338
pytestmark = [pytest.mark.integration, pytest.mark.slow]
3439

40+
# S3/R2 env vars that must be forwarded to Iris jobs for object storage access.
41+
_S3_ENV_KEYS = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_ENDPOINT_URL", "FSSPEC_S3"]
42+
3543

3644
def _upload_tree(local_root: Path, s3_dest: str) -> None:
3745
"""Upload a local directory tree to S3."""
@@ -52,6 +60,21 @@ def _rm_s3(s3_prefix: str) -> None:
5260
pass
5361

5462

63+
def _s3_env_vars() -> dict[str, str]:
64+
"""Collect S3/R2 env vars from the current process."""
65+
return {k: os.environ[k] for k in _S3_ENV_KEYS if k in os.environ}
66+
67+
68+
def _run_executor(prefix: str, synth_data: str) -> None:
69+
"""Entry point for the Iris job that runs the full executor pipeline."""
70+
config = ExecutorMainConfig(
71+
prefix=prefix,
72+
executor_info_base_path=f"{prefix}/experiments",
73+
)
74+
steps = create_steps("quickstart-tests", synth_data)
75+
executor_main(config, steps=steps)
76+
77+
5578
@pytest.mark.timeout(600)
5679
def test_marin_pipeline_on_iris(integration_cluster, monkeypatch):
5780
"""Run the full marin data pipeline dispatched through Iris."""
@@ -81,15 +104,26 @@ def test_marin_pipeline_on_iris(integration_cluster, monkeypatch):
81104
workspace=REPO_ROOT,
82105
)
83106

84-
config = ExecutorMainConfig(
85-
prefix=prefix,
86-
executor_info_base_path=f"{prefix}/experiments",
87-
)
88-
89-
experiment_prefix = "quickstart-tests"
90-
steps = create_steps(experiment_prefix, synth_data)
107+
env_vars = {
108+
"MARIN_PREFIX": prefix,
109+
"WANDB_MODE": "disabled",
110+
"WANDB_API_KEY": "",
111+
"JAX_TRACEBACK_FILTERING": "off",
112+
**_s3_env_vars(),
113+
}
91114

92115
with set_current_client(iris_client):
93-
executor_main(config, steps=steps)
116+
handle = iris_client.submit(
117+
JobRequest(
118+
name=f"marin-itest-{uuid.uuid4().hex[:8]}",
119+
entrypoint=Entrypoint.from_callable(
120+
_run_executor,
121+
args=(prefix, synth_data),
122+
),
123+
resources=ResourceConfig.with_cpu(),
124+
environment=create_environment(env_vars=env_vars),
125+
)
126+
)
127+
handle.wait(raise_on_failure=True)
94128
finally:
95129
cleanup()

0 commit comments

Comments
 (0)