Skip to content

Commit d30b4c3

Browse files
authored
Iris: Use machine-readable resource quantities in ResourceSpec proto (#2369)
This fixes some layering violations where the controller and worker code was parsing human-readable strings ("8g") at runtime. By the time requests reach the controller, they should contain machine-readable values. Also removes some typical "claudey" tests for attributes etc.
1 parent 35dc6a3 commit d30b4c3

34 files changed

Lines changed: 1037 additions & 838 deletions

lib/iris/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ src/iris/
6666

6767
```python
6868
from iris.client import IrisClient
69-
from iris.cluster.types import Entrypoint
70-
from iris.rpc.cluster_pb2 import ResourceSpec
69+
from iris.cluster.types import Entrypoint, ResourceSpec
7170

7271
def my_task():
7372
print("Hello from iris!")

lib/iris/examples/demo.ipynb

Lines changed: 303 additions & 106 deletions
Large diffs are not rendered by default.

lib/iris/examples/demo_cluster.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
_LocalImageProvider,
5151
)
5252
from iris.cluster.controller.controller import Controller, ControllerConfig, DefaultWorkerStubFactory
53-
from iris.cluster.types import Entrypoint, JobId
53+
from iris.cluster.types import EnvironmentSpec, Entrypoint, JobId, ResourceSpec
5454
from iris.cluster.worker.builder import ImageCache
5555
from iris.cluster.worker.bundle_cache import BundleCache
5656
from iris.cluster.worker.docker import DockerRuntime
@@ -244,8 +244,8 @@ def submit(
244244
) -> str:
245245
"""Submit a job to the cluster."""
246246
entrypoint = Entrypoint.from_callable(fn, *args, **kwargs)
247-
environment = cluster_pb2.EnvironmentConfig(workspace="/app", env_vars={})
248-
resources = cluster_pb2.ResourceSpec(cpu=cpu, memory=memory)
247+
environment = EnvironmentSpec(workspace="/app")
248+
resources = ResourceSpec(cpu=cpu, memory=memory)
249249
return self.client.submit(
250250
entrypoint=entrypoint,
251251
name=name or fn.__name__,

lib/iris/src/iris/client/client.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
RemoteClusterClient,
4848
get_job_info,
4949
)
50-
from iris.cluster.types import Entrypoint, is_job_finished, JobId, Namespace
50+
from iris.cluster.types import EnvironmentSpec, Entrypoint, is_job_finished, JobId, Namespace, ResourceSpec
5151
from iris.rpc import cluster_pb2
5252
from iris.time_utils import ExponentialBackoff
5353

@@ -500,8 +500,8 @@ def submit(
500500
self,
501501
entrypoint: Entrypoint,
502502
name: str,
503-
resources: cluster_pb2.ResourceSpec,
504-
environment: cluster_pb2.EnvironmentConfig | None = None,
503+
resources: ResourceSpec,
504+
environment: EnvironmentSpec | None = None,
505505
ports: list[str] | None = None,
506506
scheduling_timeout_seconds: int = 0,
507507
) -> JobId:
@@ -534,11 +534,15 @@ def submit(
534534
else:
535535
job_id = JobId(name)
536536

537+
# Convert to wire format
538+
resources_proto = resources.to_proto()
539+
environment_proto = environment.to_proto() if environment else None
540+
537541
self._cluster.submit_job(
538542
job_id=job_id,
539543
entrypoint=entrypoint,
540-
resources=resources,
541-
environment=environment,
544+
resources=resources_proto,
545+
environment=environment_proto,
542546
ports=ports,
543547
scheduling_timeout_seconds=scheduling_timeout_seconds,
544548
)

lib/iris/src/iris/client/worker_pool.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
Example:
2323
from pathlib import Path
2424
from iris.client import IrisClient, WorkerPool, WorkerPoolConfig
25+
from iris.cluster.types import ResourceSpec
2526
2627
client = IrisClient.remote("http://controller:8080", workspace=Path("./my-project"))
2728
2829
config = WorkerPoolConfig(
2930
num_workers=3,
30-
resources=cluster_pb2.ResourceSpec(cpu=1, memory="512m"),
31+
resources=ResourceSpec(cpu=1, memory="512m"),
3132
)
3233
3334
with WorkerPool(client, config) as pool:
@@ -55,8 +56,7 @@
5556
from iris.actor.client import ActorClient
5657
from iris.actor.resolver import Resolver
5758
from iris.client.client import IrisClient, iris_ctx
58-
from iris.cluster.types import Entrypoint, JobId
59-
from iris.rpc import cluster_pb2
59+
from iris.cluster.types import EnvironmentSpec, Entrypoint, JobId, ResourceSpec
6060
from iris.time_utils import ExponentialBackoff
6161

6262
logger = logging.getLogger(__name__)
@@ -329,8 +329,8 @@ class WorkerPoolConfig:
329329
"""
330330

331331
num_workers: int
332-
resources: cluster_pb2.ResourceSpec
333-
environment: cluster_pb2.EnvironmentConfig | None = None
332+
resources: ResourceSpec
333+
environment: EnvironmentSpec | None = None
334334
name_prefix: str = "worker"
335335
max_retries: int = 0
336336

lib/iris/src/iris/cluster/client/local_client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,16 @@ def __init__(self, cpu: int = 1000, memory_gb: int = 1000):
6363
self._memory_gb = memory_gb
6464

6565
def probe(self) -> cluster_pb2.WorkerMetadata:
66+
device = cluster_pb2.DeviceConfig()
67+
device.cpu.CopyFrom(cluster_pb2.CpuDevice(variant="cpu"))
68+
6669
return cluster_pb2.WorkerMetadata(
6770
hostname="local",
6871
ip_address="127.0.0.1",
6972
cpu_count=self._cpu,
7073
memory_bytes=self._memory_gb * 1024**3,
71-
)
72-
73-
def build_resource_spec(self, metadata: cluster_pb2.WorkerMetadata) -> cluster_pb2.ResourceSpec:
74-
return cluster_pb2.ResourceSpec(
75-
cpu=metadata.cpu_count,
76-
memory=f"{metadata.memory_bytes // (1024**3)}g",
74+
disk_bytes=100 * 1024**3, # Default 100GB for local
75+
device=device,
7776
)
7877

7978

@@ -368,7 +367,7 @@ def submit_job(
368367
self,
369368
job_id: str,
370369
entrypoint: Entrypoint,
371-
resources: cluster_pb2.ResourceSpec,
370+
resources: cluster_pb2.ResourceSpecProto,
372371
environment: cluster_pb2.EnvironmentConfig | None = None,
373372
ports: list[str] | None = None,
374373
scheduling_timeout_seconds: int = 0,

lib/iris/src/iris/cluster/client/remote_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def submit_job(
6767
self,
6868
job_id: str,
6969
entrypoint: Entrypoint,
70-
resources: cluster_pb2.ResourceSpec,
70+
resources: cluster_pb2.ResourceSpecProto,
7171
environment: cluster_pb2.EnvironmentConfig | None = None,
7272
ports: list[str] | None = None,
7373
scheduling_timeout_seconds: int = 0,

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,9 @@ def _dispatch_job(
271271
env_vars=dict(job.request.environment.env_vars),
272272
),
273273
bundle_gcs_path=job.request.bundle_gcs_path,
274-
resources=cluster_pb2.ResourceSpec(
274+
resources=cluster_pb2.ResourceSpecProto(
275275
cpu=job.request.resources.cpu,
276-
memory=job.request.resources.memory,
276+
memory_bytes=job.request.resources.memory_bytes,
277277
),
278278
ports=list(job.request.ports),
279279
)

lib/iris/src/iris/cluster/controller/dashboard.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@ def _job_state_name(state: int) -> str:
222222
return div.innerHTML;
223223
}
224224
225+
function formatBytes(bytes) {
226+
if (bytes === 0) return '0 B';
227+
const k = 1024;
228+
const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
229+
const i = Math.floor(Math.log(bytes) / Math.log(k));
230+
return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + ' ' + sizes[i];
231+
}
232+
225233
async function refresh() {
226234
try {
227235
const [stats, actions, workers, jobs, endpoints] = await Promise.all([
@@ -262,7 +270,8 @@ def _job_state_name(state: int) -> str:
262270
? `<a href="http://${escapeHtml(w.address)}/" class="worker-link" target="_blank">${wid}</a>`
263271
: wid;
264272
const cpu = w.resources ? w.resources.cpu : '-';
265-
const memory = w.resources ? (w.resources.memory || '-') : '-';
273+
const memBytes = w.resources ? (w.resources.memory_bytes || 0) : 0;
274+
const memory = memBytes ? formatBytes(memBytes) : '-';
266275
return `<tr>
267276
<td>${workerLink}</td>
268277
<td class="${healthClass}">${w.healthy ? 'Yes' : 'No'}</td>
@@ -280,8 +289,9 @@ def _job_state_name(state: int) -> str:
280289
const jobsHtml = jobs.map(j => {
281290
const jid = escapeHtml(j.job_id);
282291
const jobLink = `<a href="/job/${jid}" class="job-link">${jid.slice(0,8)}...</a>`;
292+
const jobMemBytes = j.resources ? (j.resources.memory_bytes || 0) : 0;
283293
const resources = j.resources
284-
? `${j.resources.cpu} CPU, ${j.resources.memory || '-'}` : '-';
294+
? `${j.resources.cpu} CPU, ${jobMemBytes ? formatBytes(jobMemBytes) : '-'}` : '-';
285295
return `<tr>
286296
<td>${jobLink}</td>
287297
<td>${escapeHtml(j.name)}</td>
@@ -778,8 +788,8 @@ def _api_workers(self, _request: Request) -> JSONResponse:
778788
"consecutive_failures": w.consecutive_failures,
779789
"last_heartbeat_ms": w.last_heartbeat_ms,
780790
"resources": {
781-
"cpu": w.resources.cpu if w.resources else 0,
782-
"memory": w.resources.memory if w.resources else "",
791+
"cpu": w.metadata.cpu_count,
792+
"memory_bytes": w.metadata.memory_bytes,
783793
},
784794
}
785795
for w in workers
@@ -805,7 +815,7 @@ def _api_jobs(self, _request: Request) -> JSONResponse:
805815
"preemption_count": j.preemption_count,
806816
"resources": {
807817
"cpu": j.request.resources.cpu if j.request.resources else 0,
808-
"memory": j.request.resources.memory if j.request.resources else "",
818+
"memory_bytes": j.request.resources.memory_bytes if j.request.resources else 0,
809819
},
810820
}
811821
for j in jobs

lib/iris/src/iris/cluster/controller/resources.py

Lines changed: 0 additions & 74 deletions
This file was deleted.

0 commit comments

Comments
 (0)