Skip to content

Commit c0697ab

Browse files
pjonssonomad
authored andcommitted
generate: always use Pool
The imap implementation will be as lazy as one could expect in Python 3.15, but until then it is more eager than one would expect. Switch from imap_unordered to using apply_async instead, but print the results in a deterministic order and remove the special case where a different code path is taken when running on a single core.
1 parent 41d0526 commit c0697ab

1 file changed

Lines changed: 31 additions & 39 deletions

File tree

cubedash/generate.py

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,12 @@
5757
import multiprocessing
5858
import re
5959
import sys
60-
from collections.abc import Generator, Sequence
60+
from collections.abc import Generator, Mapping, Sequence
6161
from dataclasses import dataclass
6262
from datetime import timedelta
6363
from functools import partial
6464
from textwrap import dedent
65+
from typing import Literal
6566

6667
import click
6768
import structlog
@@ -101,9 +102,8 @@ class GenerateSettings:
101102

102103
# pylint: disable=broad-except
103104
def generate_report(
104-
item: tuple[str, GenerateSettings, str],
105+
product_name: str, settings: GenerateSettings, grouping_time_zone: str
105106
) -> tuple[str, GenerateResult, TimePeriodOverview | None]:
106-
product_name, settings, grouping_time_zone = item
107107
log = _LOG.bind(product=product_name)
108108

109109
started_years = set()
@@ -173,43 +173,35 @@ def run_generation(
173173

174174
user_message("Generating product summaries...")
175175

176-
def on_complete(
177-
product_name: str, result: GenerateResult, summary: TimePeriodOverview | None
178-
) -> None:
179-
counts[result] += 1
180-
result_color = {
181-
GenerateResult.ERROR: "red",
182-
GenerateResult.UNSUPPORTED: "yellow",
183-
GenerateResult.CREATED: "blue",
184-
GenerateResult.UPDATED: "green",
185-
}.get(result)
186-
extra = ""
187-
if summary is not None:
188-
extra = f" (contains {summary.dataset_count} total datasets)"
176+
color: Mapping[GenerateResult, Literal["red", "yellow", "blue", "green"]] = {
177+
GenerateResult.ERROR: "red",
178+
GenerateResult.UNSUPPORTED: "yellow",
179+
GenerateResult.CREATED: "blue",
180+
GenerateResult.UPDATED: "green",
181+
}
189182

190-
user_message(
191-
f"{style(product_name, fg=result_color)} {result.name.lower()}{extra}"
192-
)
193-
194-
# If one worker, avoid any subprocesses/forking.
195-
# This makes test tracing far easier.
196-
if workers == 1:
197-
for p in products:
198-
on_complete(*generate_report((p.name, settings, grouping_time_zone)))
199-
else:
200-
# Shut down pool nicely to keep pytest-cov happy.
201-
# https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html#if-you-use-multiprocessing-pool
202-
pool = multiprocessing.Pool(workers)
203-
try:
204-
for product_name, result, summary in pool.imap_unordered(
205-
generate_report,
206-
((p.name, settings, grouping_time_zone) for p in products),
207-
chunksize=1,
208-
):
209-
on_complete(product_name, result, summary)
210-
finally:
211-
pool.close()
212-
pool.join()
183+
# Shut down pool nicely to keep pytest-cov happy.
184+
# https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html#if-you-use-multiprocessing-pool
185+
pool = multiprocessing.Pool(workers)
186+
try:
187+
results = [
188+
pool.apply_async(generate_report, (p.name, settings, grouping_time_zone))
189+
for p in products
190+
]
191+
for res in results:
192+
product_name, result, summary = res.get()
193+
counts[result] += 1
194+
extra = (
195+
""
196+
if summary is None
197+
else f" (contains {summary.dataset_count} total datasets)"
198+
)
199+
user_message(
200+
f"{style(product_name, fg=color.get(result))} {result.name.lower()}{extra}"
201+
)
202+
finally:
203+
pool.close()
204+
pool.join()
213205

214206
status_messages = ", ".join(
215207
f"{count_} {status.name.lower()}" for status, count_ in counts.items()

0 commit comments

Comments
 (0)