Skip to content

Commit ba4f60a

Browse files
committed
perf: lazy create shared pool as gpus process not using it
1 parent 2f7df7b commit ba4f60a

1 file changed

Lines changed: 27 additions & 3 deletions

File tree

src/mmore/process/dispatcher.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,32 @@ def __str__(self) -> str:
136136
)
137137

138138

139+
class _LazyPool:
140+
"""A multiprocessing pool created on first map() call.
141+
142+
Processors that override process_batch (e.g. PDF, media) never call map(), so a
143+
job that only uses them never spawns any worker.
144+
"""
145+
146+
def __init__(self, processes: int):
147+
self._processes = processes
148+
self._pool = None
149+
150+
def map(self, func, iterable):
151+
if self._pool is None:
152+
logger.info(f"Initializing shared pool with {self._processes} workers...")
153+
self._pool = mp.Pool(processes=self._processes)
154+
return self._pool.map(func, iterable)
155+
156+
def close(self):
157+
if self._pool is not None:
158+
self._pool.close()
159+
160+
def join(self):
161+
if self._pool is not None:
162+
self._pool.join()
163+
164+
139165
class Dispatcher:
140166
"""
141167
Takes a converted crawl result and dispatches it to the appropriate processor.
@@ -185,9 +211,7 @@ def _dispatch_local(
185211

186212
instantiated_processors: Dict[Type[Processor], Processor] = {}
187213

188-
num_workers = os.cpu_count() or 1
189-
logger.info(f"🚀 Initializing Shared Global Pool with {num_workers} workers...")
190-
global_pool = mp.Pool(processes=num_workers)
214+
global_pool = _LazyPool(os.cpu_count() or 1)
191215

192216
try:
193217
for processor_type, files in task_lists:

0 commit comments

Comments
 (0)