11# Copyright The Marin Authors
22# SPDX-License-Identifier: Apache-2.0
33
4- """Full marin integration pipeline running on Iris.
4+ """Run the full marin pipeline on an Iris cluster .
55
6- Ports the pipeline from tests/integration_test.py to dispatch through an Iris
7- cluster via FrayIrisClient instead of Ray.
6+ Standalone script (not pytest) so logs stream in real time.
87
9- When MARIN_CI_S3_PREFIX is set (e.g. on CoreWeave CI), all paths use S3 so
10- remote Zephyr pods can access them. The executor is submitted as an Iris job
11- so that child jobs (Zephyr coordinator/workers) inherit env vars — including
12- S3 credentials — via Iris auto-propagation.
8+ Usage:
9+ uv run tests/integration/iris/run_marin_on_iris.py \
10+ --controller-url http://localhost:10000
1311
14- Otherwise falls back to running in-process with a local tmpdir (works when
15- the Iris cluster is local / in-process).
12+ When MARIN_CI_S3_PREFIX is set, uploads test fixtures to S3 and submits
13+ the executor as an Iris job so child jobs inherit S3 credentials.
14+ Otherwise runs in-process against local filesystem.
1615"""
1716
17+ import argparse
1818import logging
1919import os
2020import shutil
21+ import sys
2122import tempfile
2223import uuid
2324from pathlib import Path
2425
2526import fsspec
26- import pytest
2727from fray import set_current_client
2828from fray .v2 .iris_backend import FrayIrisClient
2929from fray .v2 .types import Entrypoint , JobRequest , ResourceConfig , create_environment
30+ from iris .logging import configure_logging
3031from marin .execution .executor import ExecutorMainConfig , executor_main
3132from tests .integration_test import create_steps
3233
34+ configure_logging (level = logging .INFO )
3335logger = logging .getLogger (__name__ )
3436
3537REPO_ROOT = Path (__file__ ).resolve ().parents [3 ]
3638LOCAL_SYNTH_DATA = REPO_ROOT / "tests" / "quickstart-data"
3739
38- pytestmark = [pytest .mark .integration , pytest .mark .slow ]
39-
40- # S3/R2 env vars that must be forwarded to Iris jobs for object storage access.
4140_S3_ENV_KEYS = ["AWS_ACCESS_KEY_ID" , "AWS_SECRET_ACCESS_KEY" , "AWS_ENDPOINT_URL" , "FSSPEC_S3" ]
4241
4342
4443def _upload_tree (local_root : Path , s3_dest : str ) -> None :
45- """Upload a local directory tree to S3."""
4644 fs , _ = fsspec .core .url_to_fs (s3_dest )
4745 for path in local_root .rglob ("*" ):
4846 if not path .is_file ():
@@ -52,7 +50,6 @@ def _upload_tree(local_root: Path, s3_dest: str) -> None:
5250
5351
5452def _rm_s3 (s3_prefix : str ) -> None :
55- """Remove all objects under an S3 prefix (best-effort)."""
5653 fs , _ = fsspec .core .url_to_fs (s3_prefix )
5754 try :
5855 fs .rm (s3_prefix , recursive = True )
@@ -61,12 +58,10 @@ def _rm_s3(s3_prefix: str) -> None:
6158
6259
6360def _s3_env_vars () -> dict [str , str ]:
64- """Collect S3/R2 env vars from the current process."""
6561 return {k : os .environ [k ] for k in _S3_ENV_KEYS if k in os .environ }
6662
6763
6864def _run_executor (prefix : str , synth_data : str ) -> None :
69- """Entry point for the Iris job that runs the full executor pipeline."""
7065 config = ExecutorMainConfig (
7166 prefix = prefix ,
7267 executor_info_base_path = f"{ prefix } /experiments" ,
@@ -75,44 +70,38 @@ def _run_executor(prefix: str, synth_data: str) -> None:
7570 executor_main (config , steps = steps )
7671
7772
78- @pytest .mark .timeout (600 )
79- def test_marin_pipeline_on_iris (integration_cluster , monkeypatch ):
80- """Run the full marin data pipeline dispatched through Iris."""
73+ def main ():
74+ parser = argparse .ArgumentParser (description = "Run marin pipeline on Iris" )
75+ parser .add_argument ("--controller-url" , required = True )
76+ args = parser .parse_args ()
77+
8178 s3_base = os .environ .get ("MARIN_CI_S3_PREFIX" )
8279
8380 if s3_base :
84- # Remote cluster: use S3 so Zephyr coordinator pods can access data.
8581 run_id = f"marin-itest-{ uuid .uuid4 ().hex [:8 ]} "
8682 prefix = f"{ s3_base } /{ run_id } "
8783 synth_data = f"{ prefix } /quickstart-data"
84+ logger .info ("Uploading test fixtures to %s" , synth_data )
8885 _upload_tree (LOCAL_SYNTH_DATA , synth_data )
8986 cleanup = lambda : _rm_s3 (prefix ) # noqa: E731
9087 else :
91- # Local cluster: local filesystem is fine.
9288 prefix = tempfile .mkdtemp (prefix = "iris-marin-itest-" )
9389 synth_data = str (LOCAL_SYNTH_DATA )
9490 cleanup = lambda : shutil .rmtree (prefix , ignore_errors = True ) # noqa: E731
9591
96- try :
97- monkeypatch .setenv ("MARIN_PREFIX" , prefix )
98- monkeypatch .setenv ("WANDB_MODE" , "disabled" )
99- monkeypatch .setenv ("WANDB_API_KEY" , "" )
100- monkeypatch .setenv ("JAX_TRACEBACK_FILTERING" , "off" )
92+ os .environ ["MARIN_PREFIX" ] = prefix
93+ os .environ ["WANDB_MODE" ] = "disabled"
94+ os .environ ["WANDB_API_KEY" ] = ""
95+ os .environ ["JAX_TRACEBACK_FILTERING" ] = "off"
10196
97+ try :
10298 iris_client = FrayIrisClient (
103- controller_address = integration_cluster . url ,
99+ controller_address = args . controller_url ,
104100 workspace = REPO_ROOT ,
105101 )
106102
107- config = ExecutorMainConfig (
108- prefix = prefix ,
109- executor_info_base_path = f"{ prefix } /experiments" ,
110- )
111- steps = create_steps ("quickstart-tests" , synth_data )
112-
113103 if s3_base :
114- # Remote cluster: submit the executor as an Iris job so child jobs
115- # (Zephyr coordinator/workers) inherit S3 env vars automatically.
104+ logger .info ("Submitting executor as Iris job (S3 mode)" )
116105 env_vars = {
117106 "MARIN_PREFIX" : prefix ,
118107 "WANDB_MODE" : "disabled" ,
@@ -133,10 +122,24 @@ def test_marin_pipeline_on_iris(integration_cluster, monkeypatch):
133122 environment = create_environment (env_vars = env_vars ),
134123 )
135124 )
136- handle .wait (raise_on_failure = True )
125+ handle .wait (raise_on_failure = True , stream_logs = True )
137126 else :
138- # Local cluster: run in-process (local filesystem is accessible).
127+ logger .info ("Running executor in-process (local mode)" )
128+ config = ExecutorMainConfig (
129+ prefix = prefix ,
130+ executor_info_base_path = f"{ prefix } /experiments" ,
131+ )
132+ steps = create_steps ("quickstart-tests" , synth_data )
139133 with set_current_client (iris_client ):
140134 executor_main (config , steps = steps )
135+
136+ logger .info ("Marin pipeline completed successfully" )
137+ except Exception :
138+ logger .exception ("Marin pipeline failed" )
139+ sys .exit (1 )
141140 finally :
142141 cleanup ()
142+
143+
144+ if __name__ == "__main__" :
145+ main ()
0 commit comments