Skip to content

Commit 4aed9d8

Browse files
authored
Merge pull request #867 from punch-mission/updates
Updates
2 parents ea93185 + 6f70aa7 commit 4aed9d8

9 files changed

Lines changed: 56 additions & 10 deletions

File tree

changelog/867.feature.2.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
The whole pipeline's memory use is now capped.

changelog/867.feature.3.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
The config file can now contain per-server values.

changelog/867.feature.4.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Starfield generation uses 32-bit floats to reduce memory pressure.

changelog/867.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
The maximum number of running flows can be capped.

punchbowl/auto/cli.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,14 @@ def run(configuration_path, launch_prefect=False, launch_dask_cluster=False):
157157
try:
158158
numa_prefix_control = ["numactl", "--localalloc", "--physcpubind=0-11"]
159159
numa_prefix_workers = ["numactl", "--localalloc", "--physcpubind=12-63,64-125,192-255"]
160+
# This starts our pipeline in a cgroup context in which the collective memory use of the pipeline and
161+
# all subprocesses, including shared memory, is capped at a certain amount. If we approach an
162+
# out-of-memory condition, this hopefully contains it to the pipeline while keeping the rest of the
163+
# server (including, critically, SSH access) healthy.
164+
mem_limit_prefix = ["systemd-run", "--scope", "-p", "MemoryMax=1800G", "-p", "MemoryHigh=1700G", "--user",
165+
"--description", "Limit pipeline memory"]
160166
if launch_prefect:
161-
print("Launcing prefect")
167+
print("Launching prefect")
162168
prefect_process = subprocess.Popen(
163169
[*numa_prefix_control, "prefect", "server", "start", "--no-services"], stdout=f, stderr=f)
164170
time.sleep(5)
@@ -181,7 +187,8 @@ def run(configuration_path, launch_prefect=False, launch_dask_cluster=False):
181187
# These processes send a _lot_ of output, so we let it go to the screen instead of making the log file
182188
# enormous
183189
def data_process_launcher() -> subprocess.Popen:
184-
return subprocess.Popen([*numa_prefix_workers, "punchpipe", "serve-data", configuration_path])
190+
return subprocess.Popen([*mem_limit_prefix, *numa_prefix_workers, "punchpipe", "serve-data",
191+
configuration_path])
185192

186193
def control_process_launcher() -> subprocess.Popen:
187194
return subprocess.Popen([*numa_prefix_control, "punchpipe", "serve-control", configuration_path])

punchbowl/auto/control/launcher.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def escalate_long_waiting_flows(session, pipeline_config):
121121

122122

123123
def determine_launchable_flow_count(weight_planned, weight_running, max_weight_running, max_weight_to_launch,
124-
max_flows_to_launch):
124+
max_flows_to_launch, max_flows_running, num_running_flows):
125125
logger = get_run_logger()
126126
amount_to_launch = max_weight_running - weight_running
127127
logger.info(f"Total weight {amount_to_launch:.2f} can be launched at this time.")
@@ -130,7 +130,12 @@ def determine_launchable_flow_count(weight_planned, weight_running, max_weight_r
130130
amount_to_launch = max(0, amount_to_launch)
131131
logger.info(f"Will launch up to {amount_to_launch:.2f} weight and {max_flows_to_launch} flows")
132132

133-
return min(amount_to_launch, weight_planned), max_flows_to_launch
133+
number_launchable = max_flows_running - num_running_flows
134+
n_to_launch = min(number_launchable, max_flows_to_launch)
135+
logger.info(f"{num_running_flows} flows running now. Max {max_flows_running} total, {max_flows_to_launch} per "
136+
f"launch window. Launching up to {n_to_launch}.")
137+
138+
return min(amount_to_launch, weight_planned), n_to_launch
134139

135140

136141
@task(cache_policy=NO_CACHE)
@@ -295,9 +300,11 @@ async def launcher(pipeline_config_path=None):
295300
max_weight_running = pipeline_config["control"]["launcher"]["max_weight_running"]
296301
max_weight_to_launch = pipeline_config["control"]["launcher"]["max_weight_to_launch_at_once"]
297302
max_flows_to_launch = pipeline_config["control"]["launcher"]["max_flows_to_launch_at_once"]
303+
max_flows_running = pipeline_config["control"]["launcher"]["max_flows_running"]
298304

299305
weight_to_launch, max_flows_to_launch = determine_launchable_flow_count(
300-
weight_planned, weight_running, max_weight_running, max_weight_to_launch, max_flows_to_launch)
306+
weight_planned, weight_running, max_weight_running, max_weight_to_launch, max_flows_to_launch,
307+
max_flows_running, num_running_flows)
301308

302309
flows_to_launch, tags_by_flow, selected_weight, number_of_flows, counts_per_type = gather_planned_flows(
303310
session, weight_to_launch, max_flows_to_launch, flow_weights, flow_enabled, flow_batch_sizes, flow_hosts)

punchbowl/auto/control/tests/test_launcher.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def test_filter_for_launchable_flows(db, flow_weights, flow_hosts):
158158
running_count, planned_count, weight_planned, weight_running = count_flows.fn(db, flow_weights, flow_hosts)
159159
max_weight_running = 30
160160
ready_to_launch_weight, max_flows_to_launch = determine_launchable_flow_count(
161-
weight_planned, weight_running, max_weight_running, math.inf, 10)
161+
weight_planned, weight_running, max_weight_running, math.inf, 10, 100, running_count)
162162
assert ready_to_launch_weight == 5
163163
assert max_flows_to_launch == 10
164164

@@ -168,7 +168,7 @@ def test_filter_for_launchable_flows_with_max_of_1(db, flow_weights, flow_batch_
168168
running_count, planned_count, weight_planned, weight_running = count_flows.fn(db, flow_weights, flow_hosts)
169169
max_weight_running = 1
170170
ready_to_launch_weight, max_flows_to_launch = determine_launchable_flow_count(
171-
weight_planned, weight_running, max_weight_running, math.inf, 10)
171+
weight_planned, weight_running, max_weight_running, math.inf, 10, 100, running_count)
172172
assert ready_to_launch_weight == 1
173173
assert max_flows_to_launch == 10
174174
flows, tags_by_flow, selected_weight, number_of_flows, count_per_type = gather_planned_flows.fn(
@@ -184,16 +184,26 @@ def test_filter_for_launchable_flows_with_max_of_0(db, flow_weights, flow_hosts)
184184
running_count, planned_count, weight_planned, weight_running = count_flows.fn(db, flow_weights, flow_hosts)
185185
max_weight_running = 0
186186
ready_to_launch_weight, max_flows_to_launch = determine_launchable_flow_count(
187-
weight_planned, weight_running, max_weight_running, math.inf, 0)
187+
weight_planned, weight_running, max_weight_running, math.inf, 0, 100, running_count)
188188
assert ready_to_launch_weight == 0
189189
assert max_flows_to_launch == 0
190190

191191

192+
def test_filter_for_launchable_flows_cap_by_number(db, flow_weights, flow_hosts):
193+
with prefect_test_harness(), disable_run_logger():
194+
running_count, planned_count, weight_planned, weight_running = count_flows.fn(db, flow_weights, flow_hosts)
195+
max_weight_running = 100
196+
ready_to_launch_weight, max_flows_to_launch = determine_launchable_flow_count(
197+
weight_planned, weight_running, max_weight_running, math.inf, 100, 1, running_count)
198+
assert ready_to_launch_weight > 0
199+
assert max_flows_to_launch == 1
200+
201+
192202
def test_filter_for_launchable_flows_with_empty_db(db_empty, flow_weights, flow_hosts):
193203
with prefect_test_harness(), disable_run_logger():
194204
running_count, planned_count, weight_planned, weight_running = count_flows.fn(db_empty, flow_weights, flow_hosts)
195205
max_weight_running = 30
196206
ready_to_launch_weight, max_flows_to_launch = determine_launchable_flow_count(
197-
weight_planned, weight_running, max_weight_running, math.inf, 20)
207+
weight_planned, weight_running, max_weight_running, math.inf, 20, 100, running_count)
198208
assert ready_to_launch_weight == 0
199209
assert max_flows_to_launch == 20

punchbowl/auto/control/util.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import re
3+
import socket
34
from math import inf
45
from datetime import UTC, datetime
56
from itertools import islice
@@ -65,10 +66,22 @@ def load_pipeline_configuration(path: str = None) -> dict:
6566
path = run_coro_as_sync(path)
6667
with open(path) as f:
6768
config = yaml.load(f, Loader=FullLoader)
69+
hostify_config(config)
6870
# TODO: add validation
6971
return config
7072

7173

74+
def hostify_config(config):
75+
tag = '-' + socket.gethostname().split(".")[0]
76+
for key in list(config.keys()):
77+
if isinstance(config[key], dict):
78+
hostify_config(config[key])
79+
if isinstance(key, str) and key.endswith(tag):
80+
new_key = key.replace(tag, '')
81+
config[new_key] = config[key]
82+
del config[key]
83+
84+
7285
def load_quicklook_scaling(level: str = None, product: str = None, obscode: str = None, path: str = None) -> (float, float):
7386
if path is None:
7487
path = Variable.get("punchpipe_config", "punchpipe_config.yaml")

punchbowl/level3/stellar.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ def __init__(self, layer: int | None = None, apply_mask: bool = True, key: str =
134134

135135
def load_image(self, filename: str) -> ImageHolder:
136136
"""Load an image."""
137-
cube = load_ndcube_from_fits(filename, key=self.key, include_provenance=False, include_uncertainty=False)
137+
cube = load_ndcube_from_fits(filename, key=self.key, include_provenance=False, include_uncertainty=False,
138+
dtype=np.float32)
138139

139140
if self.apply_mask:
140141
mask = (cube.data[self.layer] == 0) if self.layer is not None else (cube.data == 0)
@@ -258,6 +259,7 @@ def generate_starfield_background(
258259
n_procs=n_procs,
259260
processor=PUNCHImageProcessor(0, apply_mask=True, key="A"),
260261
handle_wrap_point=False,
262+
dtype=np.float32,
261263
target_mem_usage=target_mem_usage)
262264
logger.info("Ending m starfield")
263265
out_data_m = starfield_m.starfield - percentile_filter(starfield_m.starfield, 5, 10)
@@ -273,6 +275,7 @@ def generate_starfield_background(
273275
n_procs=n_procs,
274276
processor=PUNCHImageProcessor(1, apply_mask=True, key="A"),
275277
handle_wrap_point=False,
278+
dtype=np.float32,
276279
target_mem_usage=target_mem_usage)
277280
logger.info("Ending z starfield")
278281
out_data_z = starfield_z.starfield - percentile_filter(starfield_z.starfield, 5, 10)
@@ -288,6 +291,7 @@ def generate_starfield_background(
288291
n_procs=n_procs,
289292
processor=PUNCHImageProcessor(2, apply_mask=True, key="A"),
290293
handle_wrap_point=False,
294+
dtype=np.float32,
291295
target_mem_usage=target_mem_usage)
292296
logger.info("Ending p starfield")
293297
out_data_p = starfield_p.starfield - percentile_filter(starfield_p.starfield, 5, 10)
@@ -306,6 +310,7 @@ def generate_starfield_background(
306310
n_procs=n_procs,
307311
processor=PUNCHImageProcessor(None, apply_mask=True, key="A"),
308312
handle_wrap_point=False,
313+
dtype=np.float32,
309314
target_mem_usage=target_mem_usage)
310315
logger.info("Ending clear starfield")
311316
out_data = starfield_clear.starfield - percentile_filter(starfield_clear.starfield, 5, 10)

0 commit comments

Comments
 (0)