Skip to content
This repository was archived by the owner on Feb 13, 2026. It is now read-only.

Commit a239b1b

Browse files
authored
Merge pull request #310 from punch-mission/pipeline-adjustments
Pipeline adjustments
2 parents 8c2e1e5 + 2377aa8 commit a239b1b

8 files changed

Lines changed: 30 additions & 24 deletions

File tree

punchpipe/control/db.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def directory(self, root: str):
6666
Index("construct_background", File.level, File.observatory, File.outlier, File.date_obs, File.state, File.file_type)
6767
Index("get_cal_file", File.file_type, File.observatory, File.date_obs, File.state)
6868
Index("CNN", File.file_type, File.observatory, File.level, File.state, File.outlier)
69+
Index("processing_flow_index", File.processing_flow)
6970

7071

7172
class Flow(Base):

punchpipe/control/launcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def gather_planned_flows(session, weight_to_launch, max_flows_to_launch, flow_we
2626
flows = (session.query(Flow)
2727
.where(Flow.state == "planned")
2828
.where(Flow.flow_type.in_(enabled_flows))
29-
.order_by(Flow.is_backprocessing.asc(), Flow.priority.desc(), Flow.creation_time.desc())
29+
.order_by(Flow.is_backprocessing.asc(), Flow.priority.desc(), Flow.creation_time.asc())
3030
.limit(max_to_select).all())
3131
selected_flows = []
3232
selected_weight = 0

punchpipe/control/processor.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,23 @@ def generic_process_flow_logic(flow_id: int | list[int], core_flow_to_launch, pi
5757
file_db_entry_list = session.query(File).where(File.processing_flow == flow_db_entry.flow_id).all()
5858

5959
# update the file database entries as being created
60-
if file_db_entry_list:
61-
for file_db_entry in file_db_entry_list:
62-
if file_db_entry.state != "planned":
63-
raise RuntimeError(f"File id {file_db_entry.file_id} has already been created.")
64-
if os.path.exists(os.path.join(
65-
file_db_entry.directory(pipeline_config['root']), file_db_entry.filename())):
66-
raise RuntimeError(f"Expected output file {file_db_entry.filename()} (id {file_db_entry.file_id}) "
67-
"already exists on disk")
68-
file_db_entry.state = "creating"
69-
else:
70-
raise RuntimeError("There should be at least one file associated with this flow. Found 0.")
60+
try:
61+
if file_db_entry_list:
62+
for file_db_entry in file_db_entry_list:
63+
if file_db_entry.state != "planned":
64+
raise RuntimeError(f"File id {file_db_entry.file_id} has already been created.")
65+
if os.path.exists(os.path.join(
66+
file_db_entry.directory(pipeline_config['root']), file_db_entry.filename())):
67+
raise RuntimeError(f"Expected output file {file_db_entry.filename()} (id {file_db_entry.file_id}) "
68+
"already exists on disk")
69+
file_db_entry.state = "creating"
70+
else:
71+
raise RuntimeError("There should be at least one file associated with this flow. Found 0.")
72+
except:
73+
# The exception handler rolls back the transaction, but we do want our start_time to stay in place. So
74+
# commit on error, but otherwise let the transaction keep growing into a big batch
75+
session.commit()
76+
raise
7177
file_db_entry_lists.append(file_db_entry_list)
7278
session.commit()
7379

punchpipe/control/scheduler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ def generic_scheduler_flow_logic(
111111
database_flow_info = construct_child_flow_info(parent_files, children_files,
112112
pipeline_config, session=session,
113113
reference_time=reference_time, **args_dictionary)
114+
# We've had some failures where a flow reports "no associated files", despite the output files having
115+
# their processing_flow set properly. Best guess is the DB is running slow, and so the new flow has been
116+
# committed but the files' processing_flow hasn't been updated yet. So let's not let the state be
117+
# 'planned' until everything is in place.
118+
database_flow_info.state = 'being_planned'
114119
if backprocess_cutoff := pipeline_config.get('prioritize_most_recent_n_days', None):
115120
cutoff = datetime.now(UTC) - timedelta(days=backprocess_cutoff)
116121
if all(cf.date_obs.replace(tzinfo=UTC) < cutoff for cf in children_files):
@@ -138,5 +143,7 @@ def generic_scheduler_flow_logic(
138143
for parent_file, child_file in iterable:
139144
session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id))
140145

146+
database_flow_info.state = 'planned'
147+
141148
session.commit()
142149
return len(ready_files)

punchpipe/flows/fcorona.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, sess
118118
logger.info("Flow 'construct_f_corona_background' is not enabled---halting scheduler")
119119
return 0
120120

121-
max_flows = 2 * pipeline_config['flows']['construct_f_corona_background'].get('concurrency_limit', 1000)
121+
max_flows = pipeline_config['flows']['construct_f_corona_background'].get('concurrency_limit', 1000)
122122
existing_flows = (session.query(Flow)
123123
.where(Flow.flow_type == 'construct_f_corona_background')
124124
.where(Flow.state.in_(["planned", "launched", "running"])).count())

punchpipe/flows/level3.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ def level3_PTM_query_ready_files(session, pipeline_config: dict, reference_time=
5353
return [[f.file_id] for f in actually_ready_files]
5454

5555

56-
@task(cache_policy=NO_CACHE)
5756
def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File,
5857
pipeline_config: dict, session=None, reference_time=None):
5958
session = get_database_session() # TODO: replace so this works in the tests by passing in a test
@@ -83,7 +82,6 @@ def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File,
8382
)
8483

8584

86-
@task(cache_policy=NO_CACHE)
8785
def level3_PTM_construct_file_info(input_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
8886
date_obses = [f.date_obs for f in input_files]
8987

@@ -151,7 +149,6 @@ def level3_PIM_query_ready_files(session, pipeline_config: dict, reference_time=
151149
return [[f.file_id] for f in actually_ready_files]
152150

153151

154-
@task(cache_policy=NO_CACHE)
155152
def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict,
156153
session=None, reference_time=None):
157154
session = get_database_session() # TODO: replace so this works in the tests by passing in a test
@@ -188,7 +185,6 @@ def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File,
188185
)
189186

190187

191-
@task(cache_policy=NO_CACHE)
192188
def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
193189
date_obses = [f.date_obs for f in level2_files]
194190

@@ -260,7 +256,6 @@ def level3_CIM_query_ready_files(session, pipeline_config: dict, reference_time=
260256
return [[f.file_id] for f in actually_ready_files]
261257

262258

263-
@task(cache_policy=NO_CACHE)
264259
def level3_CIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict,
265260
session=None, reference_time=None):
266261
session = get_database_session() # TODO: replace so this works in the tests by passing in a test
@@ -298,7 +293,6 @@ def level3_CIM_construct_flow_info(level2_files: list[File], level3_file: File,
298293
)
299294

300295

301-
@task(cache_policy=NO_CACHE)
302296
def level3_CIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
303297
date_obses = [f.date_obs for f in level2_files]
304298

@@ -366,7 +360,6 @@ def level3_CTM_query_ready_files(session, pipeline_config: dict, reference_time=
366360
return [[f.file_id] for f in actually_ready_files]
367361

368362

369-
@task(cache_policy=NO_CACHE)
370363
def level3_CTM_construct_flow_info(level2_files: list[File], level3_file: File,
371364
pipeline_config: dict, session=None, reference_time=None):
372365
session = get_database_session() # TODO: replace so this works in the tests by passing in a test
@@ -397,7 +390,6 @@ def level3_CTM_construct_flow_info(level2_files: list[File], level3_file: File,
397390
)
398391

399392

400-
@task(cache_policy=NO_CACHE)
401393
def level3_CTM_construct_file_info(input_files: t.List[File], pipeline_config: dict, reference_time=None, ) -> t.List[File]:
402394
date_obses = [f.date_obs for f in input_files]
403395

punchpipe/flows/stray_light.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ def construct_polarized_stray_light_check_for_inputs(session,
190190
for group in second_half_inputs[:max_files_per_half]:
191191
all_ready_files.extend(group)
192192

193-
logger.info(f"{len(all_ready_files)} Level 1 P*{reference_files[0].observatory} files will be used "
193+
logger.info(f"{len(all_ready_files)} Level 1 Y*{reference_files[0].observatory} files will be used "
194194
"for stray light estimation.")
195195
return [f.file_id for f in all_ready_files]
196196
return []
@@ -293,7 +293,7 @@ def construct_stray_light_scheduler_flow(pipeline_config_path=None, session=None
293293
logger.info("Flow 'construct_stray_light' is not enabled---halting scheduler")
294294
return
295295

296-
max_flows = 2 * pipeline_config['flows']['construct_stray_light'].get('concurrency_limit', 1000)
296+
max_flows = pipeline_config['flows']['construct_stray_light'].get('concurrency_limit', 1000)
297297
existing_flows = (session.query(Flow)
298298
.where(Flow.flow_type == 'construct_stray_light')
299299
.where(Flow.state.in_(["planned", "launched", "running"])).count())

punchpipe/speedster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def gather_planned_flows(session, enabled_flows, max_n=None):
3737
flows = (session.query(Flow)
3838
.where(Flow.state == "planned")
3939
.where(Flow.flow_type.in_(enabled_flows))
40-
.order_by(Flow.is_backprocessing.asc(), Flow.priority.desc(), Flow.creation_time.desc())
40+
.order_by(Flow.is_backprocessing.asc(), Flow.priority.desc(), Flow.creation_time.asc())
4141
.limit(max_n).all())
4242
count_per_type = defaultdict(lambda: 0)
4343
flow_ids = []

0 commit comments

Comments
 (0)