Skip to content

Commit ef51f83

Browse files
authored
[iris] Reject stale clients on root LaunchJob submissions (#5108)
Add client_revision_date to LaunchJobRequest, stamped at wheel build time (via scripts/python_libs_package.py) and from git log on editable installs. Controller rejects root submissions older than MIN_CLIENT_REVISION_DATE; nested submissions are exempt so in-flight jobs aren't killed by floor bumps. Empty string is treated as the feature introduction date so already-deployed clients get a 2-week grace window. Also deletes lib/iris/hatch_build.py (vestigial now that protos are checked in). Fixes #4840
1 parent 8ef6073 commit ef51f83

14 files changed

Lines changed: 435 additions & 302 deletions

File tree

lib/iris/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ ENV PATH="/app/.venv/bin:$PATH"
5555
# Rigging is a workspace-local dep not on PyPI with a compatible API.
5656
# Copy only metadata first so source changes don't bust the dep cache.
5757
COPY lib/rigging/pyproject.toml ./lib/rigging/pyproject.toml
58-
COPY lib/iris/pyproject.toml lib/iris/hatch_build.py ./lib/iris/
58+
COPY lib/iris/pyproject.toml ./lib/iris/
5959
RUN --mount=type=cache,target=/root/.cache/uv \
6060
printf '[tool.uv.workspace]\nmembers = ["lib/iris", "lib/rigging"]\n\n[tool.uv.sources]\nmarin-rigging = { workspace = true }\n' > pyproject.toml \
6161
&& uv sync --package marin-iris --no-install-project

lib/iris/Dockerfile.dockerignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
*
44

55
!lib/iris/pyproject.toml
6-
!lib/iris/hatch_build.py
76
!lib/iris/src/
87
!lib/iris/examples/
98
!lib/iris/config/

lib/iris/hatch_build.py

Lines changed: 0 additions & 148 deletions
This file was deleted.

lib/iris/pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ dev = [
6666
"pytest>=8.4",
6767
]
6868

69-
[tool.hatch.build.hooks.custom]
70-
7169
[tool.hatch.build.targets.wheel]
7270
packages = ["src/iris"]
7371

lib/iris/src/iris/_build_info.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Copyright The Marin Authors
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
# Auto-generated. Overwritten by scripts/python_libs_package.py during wheel
5+
# builds. Editable installs leave BUILD_DATE empty; iris.version falls back to
6+
# `git log` on the iris source tree.
7+
8+
BUILD_DATE = ""

lib/iris/src/iris/cluster/client/remote_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from iris.rpc.logging_connect import LogServiceClientSync
2626
from iris.rpc.errors import call_with_retry, format_connect_error, poll_with_retries
2727
from iris.time_proto import duration_to_proto
28+
from iris.version import client_revision_date
2829
from rigging.timing import Deadline, Duration, ExponentialBackoff
2930

3031
logger = logging.getLogger(__name__)
@@ -132,6 +133,7 @@ def submit_job(
132133
task_image=task_image or "",
133134
priority_band=priority_band,
134135
submit_argv=submit_argv or [],
136+
client_revision_date=client_revision_date(),
135137
)
136138
if self._bundle_id:
137139
request.bundle_id = self._bundle_id

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import uuid
1818
import dataclasses
1919
from dataclasses import dataclass
20+
from datetime import date, timedelta
2021
from typing import Any, Protocol
2122

2223
from connectrpc.code import Code
@@ -124,6 +125,44 @@
124125
# Maximum bundle size in bytes (25 MB) - matches client-side limit
125126
MAX_BUNDLE_SIZE_BYTES = 25 * 1024 * 1024
126127

128+
# A root LaunchJob submission is rejected if its client_revision_date is more
129+
# than FRESHNESS_WINDOW older than today. Clients get exactly this long to
130+
# upgrade after a new marin-iris release is cut.
131+
FRESHNESS_WINDOW = timedelta(days=14)
132+
133+
# Date this freshness check shipped. An empty client_revision_date is
134+
# interpreted as this date — already-deployed clients that don't set the field
135+
# start being rejected FRESHNESS_WINDOW after rollout.
136+
FEATURE_INTRODUCTION_DATE = date(2026, 4, 22)
137+
138+
139+
def _check_client_freshness(client_date_str: str, now: date) -> None:
140+
"""Reject root LaunchJob submissions whose client is older than FRESHNESS_WINDOW.
141+
142+
Empty string is treated as FEATURE_INTRODUCTION_DATE so old clients (which
143+
don't set the field at all) behave as if they shipped the day this check
144+
rolled out.
145+
"""
146+
if not client_date_str:
147+
client_date = FEATURE_INTRODUCTION_DATE
148+
else:
149+
try:
150+
client_date = date.fromisoformat(client_date_str)
151+
except ValueError as err:
152+
raise ConnectError(
153+
Code.INVALID_ARGUMENT,
154+
f"client_revision_date must be ISO YYYY-MM-DD, got {client_date_str!r}",
155+
) from err
156+
floor = now - FRESHNESS_WINDOW
157+
if client_date < floor:
158+
raise ConnectError(
159+
Code.FAILED_PRECONDITION,
160+
f"marin-iris client is too old (build {client_date.isoformat()}; "
161+
f"minimum {floor.isoformat()}). Run `uv sync` or upgrade "
162+
f"marin-iris and retry.",
163+
)
164+
165+
127166
USER_TASK_STATES = (
128167
job_pb2.TASK_STATE_PENDING,
129168
job_pb2.TASK_STATE_ASSIGNED,
@@ -1040,6 +1079,12 @@ def launch_job(
10401079

10411080
job_id = JobName.from_wire(request.name)
10421081

1082+
# Reject root submissions from stale clients. Nested submissions (from
1083+
# a job already running in the cluster) are exempt — the workload would
1084+
# otherwise crash mid-flight as the freshness window slides forward.
1085+
if job_id.is_root:
1086+
_check_client_freshness(request.client_revision_date, date.today())
1087+
10431088
# When an auth provider is configured, override the user segment with
10441089
# the verified identity to prevent impersonation. Only override for
10451090
# root-level submissions; child jobs inherit the parent's user.

lib/iris/src/iris/rpc/controller.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ message Controller {
106106
// Empty when submitted programmatically. Bookkeeping only; not
107107
// interpreted by the controller.
108108
repeated string submit_argv = 35;
109+
110+
// ISO date (YYYY-MM-DD) of the client's marin-iris build. For wheels this
111+
// is stamped at build time by scripts/python_libs_package.py; for editable
112+
// installs it's derived from `git log` on the iris source tree. Root
113+
// submissions older than FRESHNESS_WINDOW (today - 14d) are rejected.
114+
// Nested submissions (is_root == false) are exempt. Empty string is
115+
// interpreted as FEATURE_INTRODUCTION_DATE — see service.py.
116+
string client_revision_date = 36;
109117
}
110118

111119
message LaunchJobResponse {

lib/iris/src/iris/rpc/controller_pb2.py

Lines changed: 147 additions & 147 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/iris/src/iris/rpc/controller_pb2.pyi

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class Controller(_message.Message):
4747
JOB_QUERY_SCOPE_ROOTS: Controller.JobQueryScope
4848
JOB_QUERY_SCOPE_CHILDREN: Controller.JobQueryScope
4949
class LaunchJobRequest(_message.Message):
50-
__slots__ = ("name", "entrypoint", "resources", "environment", "bundle_id", "bundle_blob", "scheduling_timeout", "ports", "max_task_failures", "max_retries_failure", "max_retries_preemption", "constraints", "coscheduling", "replicas", "timeout", "fail_if_exists", "reservation", "preemption_policy", "existing_job_policy", "priority_band", "task_image", "submit_argv")
50+
__slots__ = ("name", "entrypoint", "resources", "environment", "bundle_id", "bundle_blob", "scheduling_timeout", "ports", "max_task_failures", "max_retries_failure", "max_retries_preemption", "constraints", "coscheduling", "replicas", "timeout", "fail_if_exists", "reservation", "preemption_policy", "existing_job_policy", "priority_band", "task_image", "submit_argv", "client_revision_date")
5151
NAME_FIELD_NUMBER: _ClassVar[int]
5252
ENTRYPOINT_FIELD_NUMBER: _ClassVar[int]
5353
RESOURCES_FIELD_NUMBER: _ClassVar[int]
@@ -70,6 +70,7 @@ class Controller(_message.Message):
7070
PRIORITY_BAND_FIELD_NUMBER: _ClassVar[int]
7171
TASK_IMAGE_FIELD_NUMBER: _ClassVar[int]
7272
SUBMIT_ARGV_FIELD_NUMBER: _ClassVar[int]
73+
CLIENT_REVISION_DATE_FIELD_NUMBER: _ClassVar[int]
7374
name: str
7475
entrypoint: _job_pb2.RuntimeEntrypoint
7576
resources: _job_pb2.ResourceSpecProto
@@ -92,7 +93,8 @@ class Controller(_message.Message):
9293
priority_band: _job_pb2.PriorityBand
9394
task_image: str
9495
submit_argv: _containers.RepeatedScalarFieldContainer[str]
95-
def __init__(self, name: _Optional[str] = ..., entrypoint: _Optional[_Union[_job_pb2.RuntimeEntrypoint, _Mapping]] = ..., resources: _Optional[_Union[_job_pb2.ResourceSpecProto, _Mapping]] = ..., environment: _Optional[_Union[_job_pb2.EnvironmentConfig, _Mapping]] = ..., bundle_id: _Optional[str] = ..., bundle_blob: _Optional[bytes] = ..., scheduling_timeout: _Optional[_Union[_time_pb2.Duration, _Mapping]] = ..., ports: _Optional[_Iterable[str]] = ..., max_task_failures: _Optional[int] = ..., max_retries_failure: _Optional[int] = ..., max_retries_preemption: _Optional[int] = ..., constraints: _Optional[_Iterable[_Union[_job_pb2.Constraint, _Mapping]]] = ..., coscheduling: _Optional[_Union[_job_pb2.CoschedulingConfig, _Mapping]] = ..., replicas: _Optional[int] = ..., timeout: _Optional[_Union[_time_pb2.Duration, _Mapping]] = ..., fail_if_exists: _Optional[bool] = ..., reservation: _Optional[_Union[_job_pb2.ReservationConfig, _Mapping]] = ..., preemption_policy: _Optional[_Union[_job_pb2.JobPreemptionPolicy, str]] = ..., existing_job_policy: _Optional[_Union[_job_pb2.ExistingJobPolicy, str]] = ..., priority_band: _Optional[_Union[_job_pb2.PriorityBand, str]] = ..., task_image: _Optional[str] = ..., submit_argv: _Optional[_Iterable[str]] = ...) -> None: ...
96+
client_revision_date: str
97+
def __init__(self, name: _Optional[str] = ..., entrypoint: _Optional[_Union[_job_pb2.RuntimeEntrypoint, _Mapping]] = ..., resources: _Optional[_Union[_job_pb2.ResourceSpecProto, _Mapping]] = ..., environment: _Optional[_Union[_job_pb2.EnvironmentConfig, _Mapping]] = ..., bundle_id: _Optional[str] = ..., bundle_blob: _Optional[bytes] = ..., scheduling_timeout: _Optional[_Union[_time_pb2.Duration, _Mapping]] = ..., ports: _Optional[_Iterable[str]] = ..., max_task_failures: _Optional[int] = ..., max_retries_failure: _Optional[int] = ..., max_retries_preemption: _Optional[int] = ..., constraints: _Optional[_Iterable[_Union[_job_pb2.Constraint, _Mapping]]] = ..., coscheduling: _Optional[_Union[_job_pb2.CoschedulingConfig, _Mapping]] = ..., replicas: _Optional[int] = ..., timeout: _Optional[_Union[_time_pb2.Duration, _Mapping]] = ..., fail_if_exists: _Optional[bool] = ..., reservation: _Optional[_Union[_job_pb2.ReservationConfig, _Mapping]] = ..., preemption_policy: _Optional[_Union[_job_pb2.JobPreemptionPolicy, str]] = ..., existing_job_policy: _Optional[_Union[_job_pb2.ExistingJobPolicy, str]] = ..., priority_band: _Optional[_Union[_job_pb2.PriorityBand, str]] = ..., task_image: _Optional[str] = ..., submit_argv: _Optional[_Iterable[str]] = ..., client_revision_date: _Optional[str] = ...) -> None: ...
9698
class LaunchJobResponse(_message.Message):
9799
__slots__ = ("job_id",)
98100
JOB_ID_FIELD_NUMBER: _ClassVar[int]

0 commit comments

Comments
 (0)