-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathprocess.py
More file actions
316 lines (277 loc) · 14 KB
/
Copy pathprocess.py
File metadata and controls
316 lines (277 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""ProcessPoolExecutor with additional supports for skypilot."""
import concurrent.futures
import logging
import multiprocessing
import sys
import threading
import time
from typing import Callable, Dict, Optional, Tuple
from sky import exceptions
from sky.utils import atomic
from sky.utils import subprocess_utils
logger = logging.getLogger(__name__)
class PoolExecutor(concurrent.futures.ProcessPoolExecutor):
"""A custom ProcessPoolExecutor with additional supports for skypilot.
The additional supports include:
1. Disposable workers: support control whether the worker process should
exit after complete a task.
2. Idle check: support check if there are any idle workers.
3. Proactive shutdown: SIGTERM worker processes when the executor is
shutting down instead of indefinitely waiting.
"""
def __init__(self,
max_workers: int,
max_tasks_per_child: Optional[int] = None,
**kwargs):
if max_tasks_per_child is not None and sys.version_info >= (3, 11):
# Recycle a worker process after it has handled this many tasks so
# its high-water-mark RSS is reclaimed. `max_tasks_per_child` was
# added to ProcessPoolExecutor in Python 3.11; on older versions it
# is silently ignored (the caller logs a warning instead).
kwargs['max_tasks_per_child'] = max_tasks_per_child
super().__init__(max_workers=max_workers, **kwargs)
self.max_workers: int = max_workers
# The number of workers that are handling tasks, atomicity across
# multiple threads is sufficient since the idleness check is
# best-effort and does not affect the correctness.
# E.g. the following case is totally fine:
# 1. Thread 1 checks running == max_workers
# 2. Thread 2 decrements running
# 3. Thread 1 schedules the task to other pool even if the pool is
# currently idle.
self.running: atomic.AtomicInt = atomic.AtomicInt(0)
def submit(self, fn, /, *args, **kwargs) -> concurrent.futures.Future:
"""Submit a task for execution.
If reuse_worker is False, wraps the function to exit after completion.
"""
self.running.increment()
future = super().submit(fn, *args, **kwargs)
future.add_done_callback(lambda _: self.running.decrement())
return future
def has_idle_workers(self) -> bool:
"""Check if there are any idle workers."""
return self.running.get() < self.max_workers
def shutdown(self,
wait: bool = True,
*,
cancel_futures: bool = False) -> None:
"""Shutdown the executor."""
# Here wait means wait for the proactive cancellation complete.
# TODO(aylei): we may support wait=True in the future if needed.
assert wait is True, 'wait=False is not supported'
executor_processes = list(self._processes.values())
# Shutdown the executor so that executor process can exit once the
# running task is finished or interrupted.
super().shutdown(wait=False)
# Proactively interrupt the running task to avoid indefinite waiting.
subprocess_utils.run_in_parallel(
subprocess_utils.kill_process_with_grace_period,
executor_processes,
num_threads=len(executor_processes))
# Define the worker function outside of the class to avoid pickling self
def _disposable_worker(fn, initializer, initargs, result_queue, args, kwargs):
"""The worker function that is used to run the task.
Args:
fn: The function to run.
initializer: The initializer function to run before running the task.
initargs: The arguments to pass to the initializer function.
result_queue: The queue to put the result and exception into.
args: The arguments to pass to the function.
kwargs: The keyword arguments to pass to the function.
"""
try:
if initializer is not None:
initializer(*initargs)
result = fn(*args, **kwargs)
result_queue.put(result)
except BaseException as e: # pylint: disable=broad-except
result_queue.put(e)
class DisposableExecutor:
"""A simple wrapper that creates a new process for each task.
This is a workaround for Python 3.10 since `max_tasks_per_child` of
ProcessPoolExecutor was introduced in 3.11. There is no way to control
the worker lifetime in 3.10.
Ref: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor # pylint: disable=line-too-long
TODO(aylei): use the official `max_tasks_per_child` when upgrade to 3.11
"""
def __init__(self,
max_workers: Optional[int] = None,
initializer: Optional[Callable] = None,
initargs: Tuple = ()):
self.max_workers: Optional[int] = max_workers
self.workers: Dict[int, multiprocessing.Process] = {}
self._shutdown: bool = False
self._lock: threading.Lock = threading.Lock()
self._initializer: Optional[Callable] = initializer
self._initargs: Tuple = initargs
def _monitor_worker(self, process: multiprocessing.Process,
future: concurrent.futures.Future,
result_queue: multiprocessing.Queue) -> None:
"""Monitor the worker process and cleanup when it's done."""
try:
process.join()
if not future.cancelled():
try:
# Get result from the queue if process completed
if not result_queue.empty():
result = result_queue.get(block=False)
if isinstance(result, BaseException):
future.set_exception(result)
else:
future.set_result(result)
else:
# Process ended but no result
future.set_result(None)
except (multiprocessing.TimeoutError, BrokenPipeError,
EOFError) as e:
future.set_exception(e)
finally:
if process.pid:
with self._lock:
if process.pid in self.workers:
del self.workers[process.pid]
def submit(self, fn, *args, **kwargs) -> concurrent.futures.Future:
"""Submit a task for execution and return a Future."""
future: concurrent.futures.Future = concurrent.futures.Future()
if self._shutdown:
raise RuntimeError('Cannot submit task after executor is shutdown')
with self._lock:
if (self.max_workers is not None and
len(self.workers) >= self.max_workers):
raise exceptions.ExecutionPoolFullError(
'Maximum workers reached')
result_queue: multiprocessing.Queue = multiprocessing.Queue()
process = multiprocessing.Process(target=_disposable_worker,
args=(fn, self._initializer,
self._initargs, result_queue,
args, kwargs))
process.daemon = True
process.start()
with self._lock:
pid = process.pid or 0
if pid == 0:
raise RuntimeError('Failed to start process')
self.workers[pid] = process
# Start monitor thread to cleanup the worker process when it's done
monitor_thread = threading.Thread(target=self._monitor_worker,
args=(process, future, result_queue),
daemon=True)
monitor_thread.start()
return future
def has_idle_workers(self) -> bool:
"""Check if there are any idle workers."""
if self.max_workers is None:
return True
with self._lock:
return len(self.workers) < self.max_workers
def shutdown(self):
"""Shutdown the executor."""
with self._lock:
self._shutdown = True
subprocess_utils.run_in_parallel(
subprocess_utils.kill_process_with_grace_period,
list(self.workers.values()), # Convert dict values to list
num_threads=len(self.workers))
class BurstableExecutor:
"""An multiprocessing executor that supports bursting worker processes."""
# _executor is a PoolExecutor that is used to run guaranteed requests.
_executor: Optional[PoolExecutor] = None
# _burst_executor is a ProcessPoolExecutor that is used to run burst
# requests.
_burst_executor: Optional[DisposableExecutor] = None
def __init__(self,
garanteed_workers: int,
burst_workers: int = 0,
max_tasks_per_child: Optional[int] = None,
**kwargs):
if garanteed_workers > 0:
self._guaranteed_workers = garanteed_workers
# Worker recycling applies to the guaranteed pool only; burst
# workers are already disposed after each task. Keep
# max_tasks_per_child in the stored kwargs so the pool is rebuilt
# with the same setting after a BrokenProcessPool.
self._guaranteed_pool_kwargs = {
**kwargs,
'max_tasks_per_child': max_tasks_per_child,
}
self._guaranteed_pool_lock = threading.Lock()
self._executor = PoolExecutor(max_workers=garanteed_workers,
**self._guaranteed_pool_kwargs)
if burst_workers > 0:
self._burst_executor = DisposableExecutor(max_workers=burst_workers,
**kwargs)
def submit_until_success(self, fn, *args,
**kwargs) -> concurrent.futures.Future:
"""Submit a task for execution until success.
Prioritizes submitting to the guaranteed pool. If no idle workers
are available in the guaranteed pool, it will submit to the burst
pool. If the burst pool is full, it will retry the whole process until
the task is submitted successfully.
TODO(aylei): this is coupled with executor.RequestWorker since we
know the worker is dedicated to request scheduling and it either
blocks on request polling or request submitting. So it is no harm
to make submit blocking here. But for general cases, we need an
internal queue to decouple submit and run.
"""
while True:
if self._executor is not None and self._executor.has_idle_workers():
return self._submit_to_guaranteed_pool(fn, *args, **kwargs)
if (self._burst_executor is not None and
self._burst_executor.has_idle_workers()):
try:
fut = self._burst_executor.submit(fn, *args, **kwargs)
return fut
except exceptions.ExecutionPoolFullError:
# The burst pool is full, try the next candidate.
pass
if self._executor is not None:
# No idle workers in either pool, still queue the request
# to the guaranteed pool to keep behavior consistent.
return self._submit_to_guaranteed_pool(fn, *args, **kwargs)
logger.debug('No guaranteed pool set and the burst pool is full, '
'retry later.')
time.sleep(0.1)
def _submit_to_guaranteed_pool(self, fn, *args,
**kwargs) -> concurrent.futures.Future:
"""Submit to the guaranteed pool with retry.
The concurrent.futures.ProcessPoolExecutor will terminate all child
processes and reject new tasks if any of the child process died, e.g.
OOM killed. So when the pool is broken, we have to rebuild a new pool
to execute tasks.
Alternatives considered:
- multiprocessing.Pool: individual process failure does not affect the
pool, but it lacks of lazy-init support and does not handle process
failure gracefully.
- inherit from concurrent.futures.ProcessPoolExecutor: If a child
process dies when executing a task, we have to set an exception
in the Future object of the task. But there is a race condition
that the child process may dies exactly after get the task from
queue so there is no (obviously) safe way to record process PID
to task mapping in the current architecture of
ProcessPoolExecutor. It is more feasible to implement a custom
ProcessPoolExecutor with an new architecture.
- a custom ProcessPoolExecutor: non-trivial to implement, keep for
future improvement as BrokenProcessPool is not expected to be
a common case, i.e. it usually indicates there a bug elsewhere
when this happens.
"""
with self._guaranteed_pool_lock:
assert self._executor is not None
try:
return self._executor.submit(fn, *args, **kwargs)
except concurrent.futures.process.BrokenProcessPool as e:
logger.warning('The guaranteed pool is broken, '
f'replacing the pool and retrying. Error: {e}')
broken_pool = self._executor
threading.Thread(target=broken_pool.shutdown,
daemon=True).start()
self._executor = PoolExecutor(
max_workers=self._guaranteed_workers,
**self._guaranteed_pool_kwargs)
return self._submit_to_guaranteed_pool(fn, *args, **kwargs)
def shutdown(self) -> None:
"""Shutdown the executor."""
if self._burst_executor is not None:
self._burst_executor.shutdown()
if self._executor is not None:
self._executor.shutdown(wait=True)