Skip to content

Commit 5eb5b60

Browse files
zephyr: shuffle integration tests (#4784)
* adds a shuffle integration-test lane that exercises scatter/reduce at 10 GB on marin-dev * `lib/zephyr/tests/benchmark_shuffle.py` — synthetic shuffle driver with `--hot-shard-frac` / `--hot-key-pool` to bias one reducer and `--repeat N` [^1] so one iris job can emit multiple `RESULT:` lines for variance tracking * `.github/workflows/zephyr-shuffle-itest.yaml` — matrix over 4 scenarios (uniform/skew × small/large items), each leg submits one iris job and polls to terminal state * scenarios: `uniform-small` / `uniform-large` / `skew90-small` / `skew90-large` * small = 600k items × 250 B, large = 160 items × 1 MB — both ≈ 10 GB per run * skew = 90 % of items routed to a single hot reducer via `--hot-shard-frac 0.9 --hot-key-pool 128` * `--repeat 3` per scenario, `--priority production` on submission, `fail-fast: false` so all four legs always run * `workflow_dispatch` only for now — scheduled run would fail every night on `skew90-*` until #4782 lands (baseline parquet OOMs the hot-shard reducer) [^1]: each repeat reuses the same `ZephyrContext`, so worker actor startup happens once and the per-iteration walltime isolates shuffle time from bootstrap variance. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Rafal Wojdyla <ravwojdyla@gmail.com>
1 parent 5bae69b commit 5eb5b60

File tree

3 files changed

+443
-387
lines changed

3 files changed

+443
-387
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
name: Zephyr - Shuffle Integration Tests
2+
3+
# Exercises the zephyr scatter/reduce shuffle at 10 GB across 4 scenarios
4+
# (uniform/skew × small/large items) by submitting iris jobs to marin-dev.
5+
# Each scenario runs as its own matrix leg and polls the iris job to a
6+
# terminal state.
7+
8+
on:
9+
# Manual only for now — baseline Parquet shuffle OOMs on skew scenarios,
10+
# so a scheduled run would fail until the zstd-chunk shuffle format lands.
11+
# Add a cron once the shuffle format change is in main.
12+
workflow_dispatch:
13+
inputs:
14+
num_input_shards:
15+
description: Input shard count (default 64)
16+
required: false
17+
default: '64'
18+
19+
permissions:
20+
contents: read
21+
id-token: write
22+
23+
jobs:
24+
shuffle-itest:
25+
runs-on: ubuntu-latest
26+
timeout-minutes: 180
27+
concurrency:
28+
group: zephyr-shuffle-itest-${{ matrix.scenario }}
29+
cancel-in-progress: true
30+
31+
strategy:
32+
fail-fast: false
33+
matrix:
34+
include:
35+
- scenario: uniform-small
36+
items_per_shard: '600000'
37+
item_bytes: '250'
38+
hot_shard_frac: '0.0'
39+
hot_key_pool: '0'
40+
- scenario: uniform-large
41+
items_per_shard: '160'
42+
item_bytes: '1000000'
43+
hot_shard_frac: '0.0'
44+
hot_key_pool: '0'
45+
- scenario: skew90-small
46+
items_per_shard: '600000'
47+
item_bytes: '250'
48+
hot_shard_frac: '0.9'
49+
hot_key_pool: '128'
50+
- scenario: skew90-large
51+
items_per_shard: '160'
52+
item_bytes: '1000000'
53+
hot_shard_frac: '0.9'
54+
hot_key_pool: '128'
55+
56+
env:
57+
RUN_ID: zephyr-shuffle-itest-${{ matrix.scenario }}-${{ github.run_id }}-${{ github.run_attempt }}
58+
IRIS_CONFIG: lib/iris/examples/marin-dev.yaml
59+
IRIS_CONTROLLER_SERVICE_ACCOUNT: iris-controller@hai-gcp-models.iam.gserviceaccount.com
60+
NUM_INPUT_SHARDS: ${{ github.event.inputs.num_input_shards || '64' }}
61+
62+
steps:
63+
- name: Checkout code
64+
uses: actions/checkout@v4
65+
66+
- name: Set up Python 3.12
67+
uses: actions/setup-python@v5
68+
with:
69+
python-version: "3.12"
70+
71+
- name: Install uv
72+
uses: astral-sh/setup-uv@v7
73+
with:
74+
enable-cache: true
75+
76+
- name: Install dependencies
77+
run: uv sync --all-packages --extra=cpu --no-default-groups
78+
79+
- name: Authenticate to Google Cloud
80+
uses: google-github-actions/auth@v2
81+
with:
82+
credentials_json: ${{ secrets.IRIS_CI_GCP_SA_KEY }}
83+
84+
- name: Set up Google Cloud SDK
85+
uses: google-github-actions/setup-gcloud@v2
86+
with:
87+
project_id: ${{ secrets.GCP_PROJECT_ID }}
88+
89+
- name: Set up OS Login SSH key
90+
run: |
91+
mkdir -p ~/.ssh
92+
ssh-keygen -t rsa -b 4096 -f ~/.ssh/google_compute_engine -N "" -q -C "gha-${{ github.run_id }}-${{ github.run_attempt }}-${{ matrix.scenario }}"
93+
chmod 600 ~/.ssh/google_compute_engine
94+
gcloud compute os-login ssh-keys add \
95+
--key-file ~/.ssh/google_compute_engine.pub \
96+
--impersonate-service-account="$IRIS_CONTROLLER_SERVICE_ACCOUNT" \
97+
--ttl=6h
98+
99+
- name: Submit shuffle benchmark
100+
id: submit
101+
shell: bash -l {0}
102+
run: |
103+
JOB_ID=$(.venv/bin/iris --config=${{ env.IRIS_CONFIG }} \
104+
job run --no-wait --priority production \
105+
--memory=2G --disk=8G --cpu=1 --extra=cpu \
106+
-e SMOKE_RUN_ID "$RUN_ID" \
107+
-- python lib/zephyr/tests/benchmark_shuffle.py \
108+
--num-input-shards "$NUM_INPUT_SHARDS" \
109+
--items-per-shard "${{ matrix.items_per_shard }}" \
110+
--item-bytes "${{ matrix.item_bytes }}" \
111+
--num-keys 50000 \
112+
--max-workers 4 --worker-cpu 1 --worker-ram 8g \
113+
--hot-shard-frac "${{ matrix.hot_shard_frac }}" \
114+
--hot-key-pool "${{ matrix.hot_key_pool }}" \
115+
--repeat 3 \
116+
--label "$RUN_ID")
117+
echo "job_id=$JOB_ID" >> "$GITHUB_OUTPUT"
118+
echo "Submitted job: $JOB_ID"
119+
120+
- name: Wait for shuffle benchmark
121+
shell: bash -l {0}
122+
run: |
123+
JOB_ID="${{ steps.submit.outputs.job_id }}"
124+
echo "Polling job status: $JOB_ID"
125+
while true; do
126+
STATE=$(.venv/bin/iris --config=${{ env.IRIS_CONFIG }} \
127+
job list --json --prefix "$JOB_ID" \
128+
| jq -r --arg id "$JOB_ID" '[.[] | select(.job_id == $id)][0].state // empty')
129+
case "$STATE" in
130+
JOB_STATE_SUCCEEDED)
131+
echo "Job succeeded"
132+
exit 0
133+
;;
134+
JOB_STATE_PENDING|JOB_STATE_BUILDING|JOB_STATE_RUNNING)
135+
echo "$(date -u +%H:%M:%S) Job state: $STATE"
136+
sleep 30
137+
;;
138+
"")
139+
echo "Job not found: $JOB_ID"
140+
exit 1
141+
;;
142+
*)
143+
echo "Job finished with state: $STATE"
144+
.venv/bin/iris --config=${{ env.IRIS_CONFIG }} \
145+
job list --json --prefix "$JOB_ID" \
146+
| jq --arg id "$JOB_ID" '.[] | {job_id, state, error}' || true
147+
exit 1
148+
;;
149+
esac
150+
done
151+
152+
- name: Print benchmark results
153+
if: success()
154+
shell: bash -l {0}
155+
run: |
156+
JOB_ID="${{ steps.submit.outputs.job_id }}"
157+
.venv/bin/iris --config=${{ env.IRIS_CONFIG }} \
158+
job logs "$JOB_ID" --max-lines 200 2>/dev/null \
159+
| grep "RESULT:" || echo "No RESULT lines found"
160+
161+
- name: Capture failure diagnostics
162+
if: failure()
163+
shell: bash -l {0}
164+
run: |
165+
JOB_ID="${{ steps.submit.outputs.job_id }}"
166+
echo "=== Job summary ==="
167+
.venv/bin/iris --config=${{ env.IRIS_CONFIG }} \
168+
job summary "$JOB_ID" 2>/dev/null || true
169+
echo "=== Recent logs ==="
170+
.venv/bin/iris --config=${{ env.IRIS_CONFIG }} \
171+
job logs "$JOB_ID" --max-lines 100 2>/dev/null | tail -60 || true
172+
173+
- name: Remove OS Login SSH key
174+
if: always()
175+
run: |
176+
gcloud compute os-login ssh-keys remove \
177+
--impersonate-service-account="$IRIS_CONTROLLER_SERVICE_ACCOUNT" \
178+
--key-file ~/.ssh/google_compute_engine.pub || true

0 commit comments

Comments
 (0)