-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathhelpers.py
More file actions
157 lines (128 loc) · 5.2 KB
/
helpers.py
File metadata and controls
157 lines (128 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# Copyright The Marin Authors
# SPDX-License-Identifier: Apache-2.0
"""Job callables for e2e tests, organized as static methods on TestJobs.
All methods use logging as the primary communication channel. They are
serialized via cloudpickle (Entrypoint.from_callable) so static methods
work fine — cloudpickle pickles bytecode, not import paths.
"""
class TestJobs:
"""Namespace for job callables that run inside Iris workers.
Each method represents a distinct job behavior. Use logging for output
so tests can verify both substring filtering and structured log levels.
"""
@staticmethod
def quick():
return 1
@staticmethod
def sleep(duration: float):
import time
time.sleep(duration)
return 1
@staticmethod
def log_periodic(duration: float, interval: float = 1.0):
"""Log a message every `interval` seconds for `duration` seconds."""
import logging
import time
logger = logging.getLogger("iris.test.log_periodic")
start = time.monotonic()
tick = 0
while time.monotonic() - start < duration:
tick += 1
logger.info("tick %d (elapsed=%.1fs)", tick, time.monotonic() - start)
time.sleep(interval)
logger.info("done after %d ticks", tick)
return tick
@staticmethod
def block(s):
"""Block until sentinel is signalled."""
s.wait()
@staticmethod
def fail():
raise ValueError("intentional failure")
@staticmethod
def noop():
return "ok"
@staticmethod
def busy_loop(duration: float = 3.0):
"""CPU-bound busy loop for profiling tests."""
import time
end = time.monotonic() + duration
while time.monotonic() < end:
sum(range(1000))
@staticmethod
def log_verbose(num_lines: int = 200):
"""Emit log lines at INFO/WARNING/ERROR levels.
Covers both substring filtering (grep for "processing" vs "validation failed")
and structured level filtering (INFO vs WARNING vs ERROR). Also emits
named markers (info-marker, warning-marker, error-marker) for precise
level assertions.
"""
import logging
logger = logging.getLogger("iris.test.verbose")
for i in range(num_lines):
if i % 3 == 0:
logger.info(f"step {i}: processing data batch")
elif i % 3 == 1:
logger.warning(f"step {i}: slow operation detected")
else:
logger.error(f"step {i}: validation failed for item")
logger.info("info-marker")
logger.warning("warning-marker")
logger.error("error-marker")
logger.info("DONE: all lines emitted")
return 1
@staticmethod
def register_endpoint(prefix):
"""Register an endpoint via RPC and verify it's listed."""
from iris.cluster.client import get_job_info
from iris.rpc import controller_pb2
from iris.rpc.controller_connect import ControllerServiceClientSync
info = get_job_info()
if info is None:
raise ValueError("JobInfo not available")
client = ControllerServiceClientSync(address=info.controller_address, timeout_ms=5000)
try:
endpoint_name = f"{prefix}/actor1"
request = controller_pb2.Controller.RegisterEndpointRequest(
name=endpoint_name,
address="localhost:5000",
task_id=info.task_id.to_wire(),
metadata={"type": "actor"},
)
response = client.register_endpoint(request)
assert response.endpoint_id
list_request = controller_pb2.Controller.ListEndpointsRequest(prefix=f"{prefix}/")
list_response = client.list_endpoints(list_request)
assert len(list_response.endpoints) == 1
names = [ep.name for ep in list_response.endpoints]
assert endpoint_name in names
finally:
client.close()
@staticmethod
def validate_ports():
"""Validate that requested ports are allocated via JobInfo."""
from iris.cluster.client import get_job_info
info = get_job_info()
if info is None:
raise ValueError("JobInfo not available")
if "http" not in info.ports or "grpc" not in info.ports:
raise ValueError(f"Ports not set: {info.ports}")
assert info.ports["http"] > 0
assert info.ports["grpc"] > 0
@staticmethod
def validate_job_context():
"""Validate job context via get_job_info() in a coscheduled job."""
import logging
from iris.cluster.client import get_job_info
logger = logging.getLogger("iris.test.context")
info = get_job_info()
if info is None:
raise RuntimeError("Not running in an Iris job context")
logger.info(f"Task {info.task_index} of {info.num_tasks} on worker {info.worker_id}")
return f"Task {info.task_index} done"
@staticmethod
def wait_for_sentinel(s):
"""Wait on a sentinel with a short timeout, used for concurrency tests."""
from rigging.timing import Duration
s.wait(timeout=Duration.from_seconds(2))
return "done"