Skip to content

Commit 8ffb738

Browse files
committed
Move task status updates onto the RPC loop.
1 parent 0ae5a1c commit 8ffb738

5 files changed

Lines changed: 50 additions & 7 deletions

File tree

lib/finelog/src/finelog/deploy/cli.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ def status_cmd(name: str) -> None:
157157
class OutputFormat(StrEnum):
158158
TABLE = "table"
159159
JSON = "json"
160+
JSONL = "jsonl"
160161
CSV = "csv"
161162

162163

@@ -192,6 +193,12 @@ def _print_json(table: pa.Table) -> None:
192193
sys.stdout.write("\n")
193194

194195

196+
def _print_jsonl(table: pa.Table) -> None:
197+
for row in table.to_pylist():
198+
json.dump(row, sys.stdout, default=str)
199+
sys.stdout.write("\n")
200+
201+
195202
def _print_csv(table: pa.Table) -> None:
196203
writer = csv.writer(sys.stdout)
197204
writer.writerow(table.schema.names)
@@ -202,6 +209,7 @@ def _print_csv(table: pa.Table) -> None:
202209
_PRINTERS = {
203210
OutputFormat.TABLE: _print_table,
204211
OutputFormat.JSON: _print_json,
212+
OutputFormat.JSONL: _print_jsonl,
205213
OutputFormat.CSV: _print_csv,
206214
}
207215

@@ -213,7 +221,7 @@ def _print_csv(table: pa.Table) -> None:
213221
"--format",
214222
"output_format",
215223
type=click.Choice([f.value for f in OutputFormat]),
216-
default=OutputFormat.TABLE.value,
224+
default=OutputFormat.JSONL.value,
217225
show_default=True,
218226
help="Output format for the result.",
219227
)
@@ -399,7 +407,7 @@ def _register_namespace_views(
399407
"--format",
400408
"output_format",
401409
type=click.Choice([f.value for f in OutputFormat]),
402-
default=OutputFormat.TABLE.value,
410+
default=OutputFormat.JSONL.value,
403411
show_default=True,
404412
help="Output format for the result.",
405413
)

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ class ControllerConfig:
10911091
remote_state_dir: str = ""
10921092
"""Remote URI for controller checkpoints and worker profiles (e.g. gs://bucket/iris/state)."""
10931093

1094-
scheduler_min_interval: Duration = field(default_factory=lambda: Duration.from_seconds(1.0))
1094+
scheduler_min_interval: Duration = field(default_factory=lambda: Duration.from_seconds(10.0))
10951095
"""Minimum scheduling loop interval (used when cluster is active)."""
10961096

10971097
scheduler_max_interval: Duration = field(default_factory=lambda: Duration.from_seconds(10.0))

lib/iris/src/iris/cluster/controller/dashboard.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,8 @@ def _create_app(self) -> ASGIApp:
426426
# when present but treat everything as anonymous/admin.
427427
auth_interceptor = NullAuthInterceptor(verifier=self._auth_verifier)
428428
controller_interceptors = [auth_interceptor, controller_timing]
429+
# @on_loop handlers run inline on the event loop; everything else
430+
# is dispatched to a thread by AsyncServiceAdapter.
429431
rpc_asgi_app = ControllerServiceASGIApplication(
430432
service=AsyncServiceAdapter(self._service),
431433
interceptors=controller_interceptors,

lib/iris/src/iris/cluster/controller/service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
is_job_finished,
9191
)
9292
from iris.rpc import controller_pb2, job_pb2, query_pb2, vm_pb2, worker_pb2
93+
from iris.rpc.async_adapter import on_loop
9394
from iris.rpc.auth import (
9495
AuthzAction,
9596
authorize,
@@ -2589,6 +2590,7 @@ def update_task_status(
25892590

25902591
# --- Task Status Text Push ---
25912592

2593+
@on_loop
25922594
def set_task_status_text(
25932595
self,
25942596
request: job_pb2.SetTaskStatusTextRequest,

lib/iris/src/iris/rpc/async_adapter.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,14 @@
66
77
The ASGI application invokes ``await endpoint.function(...)`` for every
88
unary RPC, so each handler must be a coroutine function.
9-
``AsyncServiceAdapter`` exposes a sync service's methods as async by
10-
wrapping each sync method in ``asyncio.to_thread``. Methods that are
11-
already coroutine functions pass through untouched.
9+
``AsyncServiceAdapter`` exposes a sync service's methods as async:
10+
11+
- methods marked with :func:`on_loop` are wrapped in an ``async def`` that
12+
calls the sync body inline on the event loop. Use this only for short,
13+
non-blocking handlers (in-memory dicts, very cheap SQL reads) — a long
14+
handler running inline blocks every other RPC.
15+
- every other sync method gets an ``asyncio.to_thread`` wrapper.
16+
- methods that are already coroutine functions pass through untouched.
1217
1318
Interceptors are not adapted here — each interceptor that participates in
1419
an ASGI chain implements ``async intercept_unary`` directly.
@@ -19,7 +24,26 @@
1924
import asyncio
2025
import functools
2126
import inspect
22-
from typing import Any
27+
from collections.abc import Callable
28+
from typing import Any, TypeVar
29+
30+
F = TypeVar("F", bound=Callable[..., Any])
31+
32+
_ON_LOOP_ATTR = "__iris_rpc_on_loop__"
33+
34+
35+
def on_loop(fn: F) -> F:
36+
"""Mark a sync RPC handler as safe to run directly on the event loop.
37+
38+
The handler must be short and non-blocking. A handler that blocks the
39+
loop for tens of milliseconds will queue every other RPC behind it.
40+
"""
41+
setattr(fn, _ON_LOOP_ATTR, True)
42+
return fn
43+
44+
45+
def _is_on_loop(fn: Callable[..., Any]) -> bool:
46+
return getattr(fn, _ON_LOOP_ATTR, False)
2347

2448

2549
class AsyncServiceAdapter:
@@ -36,6 +60,13 @@ def __getattr__(self, name: str) -> Any:
3660
return attr
3761
if inspect.iscoroutinefunction(attr):
3862
return attr
63+
if _is_on_loop(attr):
64+
65+
@functools.wraps(attr)
66+
async def _on_loop_call(*args: Any, **kwargs: Any) -> Any:
67+
return attr(*args, **kwargs)
68+
69+
return _on_loop_call
3970

4071
@functools.wraps(attr)
4172
async def _threaded_call(*args: Any, **kwargs: Any) -> Any:

0 commit comments

Comments
 (0)