Skip to content

Commit 0c43fbe

Browse files
committed
Merge remote-tracking branch 'origin/main' into tahoma_urls
Merge main into my branch
2 parents 9d87821 + 495fe91 commit 0c43fbe

File tree

10 files changed

+367
-63
lines changed

10 files changed

+367
-63
lines changed

nmdc_automation/workflow_automation/sched.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,15 @@ def __init__(self, workflow: WorkflowConfig, trigger_act: WorkflowProcessNode, m
6565
self.workflow = workflow
6666
self.trigger_act = trigger_act
6767
self.trigger_id = trigger_act.id
68-
69-
# Default is typically the trigger act's was_informed_by
7068
self.informed_by = trigger_act.was_informed_by
7169

7270
# Default has no manifest
7371
self.manifest = None
7472

75-
# However, if we see a manifest associated with the WorkflorProcessNode that triggered the job
76-
# (which is set for DataGeneration workflowprocess nodes), then we want to override the default
77-
# and look for the list of DataGeneration ID that are found in the manifest map
73+
# Set the manifest if found; DataGeneration workflowprocess nodes need their
74+
# was_informed_by list assigned from the manifest_map
75+
# Note: was_informed_by will be properly set from trigger_act.was_informed_by
76+
# for jobs downstream of readsqc (non-dgns wf records)
7877

7978
if len(trigger_act.manifest) == 1:
8079

@@ -87,10 +86,12 @@ def __init__(self, workflow: WorkflowConfig, trigger_act: WorkflowProcessNode, m
8786
# It will return None if the key doesn't exist.
8887
mapped_value = manifest_map.get(manifest_key)
8988

89+
# For dgns wfp nodes
9090
# Check if a value was found, is a dict, and contains the required key.
9191
# This will be the associated data_generation_set IDs with the manifest set
9292
if isinstance(mapped_value, dict) and 'data_generation_set' in mapped_value:
93-
self.informed_by = mapped_value['data_generation_set']
93+
if self.trigger_id in mapped_value['data_generation_set']:
94+
self.informed_by = mapped_value['data_generation_set']
9495

9596

9697

@@ -147,7 +148,7 @@ def create_job_rec(self, job: SchedulerJob, manifest_map: Dict[str, List[str]]):
147148
# If manifest is not empty, then this is a data generation stored in the WorkflowProcessNode
148149
# Note: Currently only support one manifest per workflowprocessnode/datagen
149150
#
150-
if len(next_act.manifest) == 1:
151+
if len(next_act.manifest) == 1 and job.trigger_id in manifest_map[next_act.manifest[0]]['data_generation_set']:
151152

152153
# Find the data objects associated with the manifest using manifest_map
153154
for data_object in manifest_map[next_act.manifest[0]]['data_object_set']:
@@ -351,36 +352,41 @@ def find_new_jobs(self, wfp_node: WorkflowProcessNode, manifest_map: Dict[str, L
351352
self._messages.append(msg)
352353
continue
353354

355+
#
356+
# This check is only for wfp_nodes that are data_generation_set records to avoid duplicate scheduling
357+
#
354358
# If current wfp_node.id is not in existing jobs, see if this has a manifest record,
355359
# then check for other associated data generation records jobs that exist for this wf
356360
found_existing_manifest_job = False
357361
associated_wfp_node_id = None
358362
if len(wfp_node.manifest) == 1:
359-
for dgns_id in manifest_map[wfp_node.manifest[0]]['data_generation_set']:
360-
# Only need to check for others dgns since already checked itself above
361-
if dgns_id != wfp_node.id:
362-
if dgns_id in self.get_existing_jobs(wf):
363-
found_existing_manifest_job = True
364-
associated_wfp_node_id = dgns_id
365-
break
366-
367-
# If not found, also check if it was just added to list of all jobs
368-
if not found_existing_manifest_job:
369-
for new_job in all_jobs:
370-
if new_job.manifest:
371-
if new_job.manifest == wfp_node.manifest[0]:
372-
if new_job.workflow.name == wf.name:
373-
found_existing_manifest_job = True
374-
associated_wfp_node_id = new_job.trigger_id
375-
break
376-
377-
378-
if found_existing_manifest_job:
379-
msg = f"Skipping existing job due to associated data generation record {associated_wfp_node_id} for {wfp_node.id} {wf.name}:{wf.version}"
380-
if msg not in self._messages:
381-
logger.info(msg)
382-
self._messages.append(msg)
383-
continue
363+
if wfp_node.id in manifest_map[wfp_node.manifest[0]]['data_generation_set']:
364+
365+
for dgns_id in manifest_map[wfp_node.manifest[0]]['data_generation_set']:
366+
# Only need to check for others dgns since already checked itself above
367+
if dgns_id != wfp_node.id:
368+
if dgns_id in self.get_existing_jobs(wf):
369+
found_existing_manifest_job = True
370+
associated_wfp_node_id = dgns_id
371+
break
372+
373+
# If not found, also check if it was just added to list of all jobs
374+
if not found_existing_manifest_job:
375+
for new_job in all_jobs:
376+
if new_job.manifest:
377+
if new_job.manifest == wfp_node.manifest[0]:
378+
if new_job.workflow.name == wf.name:
379+
found_existing_manifest_job = True
380+
associated_wfp_node_id = new_job.trigger_id
381+
break
382+
383+
384+
if found_existing_manifest_job:
385+
msg = f"Skipping existing job due to associated data generation record {associated_wfp_node_id} for {wfp_node.id} {wf.name}:{wf.version}"
386+
if msg not in self._messages:
387+
logger.info(msg)
388+
self._messages.append(msg)
389+
continue
384390

385391

386392

nmdc_automation/workflow_automation/watch_nmdc.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
244244
continue
245245
elif status in ("failed", "null"):
246246
job.workflow.last_status = status
247-
job.workflow.failed_count += 1
248247
failed_jobs.append(job)
249248
continue
250249
else:

nmdc_automation/workflow_automation/wfutils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def submit_job(self, force: bool = False) -> Optional[int]:
176176
elif len(self.workflow.was_informed_by) == 1:
177177
tag_value = self.workflow.was_informed_by[0] + "/" + self.workflow.workflow_execution_id
178178

179+
# This will work to schedule but shouldn't go to this block unless bug or new feature support
179180
else:
180181
tag_value = ":".join(self.workflow.was_informed_by) + "/" + self.workflow.workflow_execution_id
181182

nmdc_automation/workflow_automation/workflow_process.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,20 @@ def get_current_workflow_process_nodes(
223223

224224
workflow_process_nodes.add(wfp_node)
225225

226+
# Build the list of data_generation_id_sets to manifest id for non-dgns processing workflows
227+
# so we can add the manifest property to wfp_nodes when was_informed_by > 1
228+
dg_set_to_manifest_map = {}
229+
for manifest_id, manifest_data in manifest_map.items():
230+
dg_ids_list = manifest_data.get('data_generation_set')
231+
232+
if dg_ids_list:
233+
# 1. Sort the list and convert it to a hashable tuple
234+
key_tuple = tuple(sorted(dg_ids_list)) #ex: ('id1', 'id2')
235+
236+
if key_tuple not in dg_set_to_manifest_map:
237+
dg_set_to_manifest_map[key_tuple] = manifest_id
238+
239+
226240
for wf in workflow_execution_workflows:
227241
q = {}
228242
if wf.git_repo:
@@ -259,6 +273,15 @@ def get_current_workflow_process_nodes(
259273
# Join the sorted elements with "_" as the separator
260274
current_found_rec_key = "_".join(sorted_was_informed_by)
261275

276+
# Look for the manifest ID to add to the workflow process node
277+
# Normalize the list: sort and convert to tuple
278+
current_manifest = None
279+
key_tuple = tuple(sorted(rec["was_informed_by"])) # Result: ('id1', 'id2')
280+
if key_tuple in dg_set_to_manifest_map:
281+
current_manifest = dg_set_to_manifest_map[key_tuple]
282+
if current_manifest:
283+
wfp_node.add_to_manifest(current_manifest)
284+
262285
# if there is already a wfp_node added for this workflow type, check if version is more recent
263286
# then add it and replace previous one.
264287
if current_found_rec_key in found_wfs:

poetry.lock

Lines changed: 21 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ python = ">=3.10,<3.12"
2929
pymongo = "^4.3.3"
3030
pyYAML = "^6.0"
3131
requests = "^2.28.2"
32-
nmdc-schema = "^11.13.0"
32+
nmdc-schema = "^11.14.0"
3333
deepdiff = ">=7.0.1"
3434
pytz = ">=2024.1"
3535
python-dotenv = "^1.0.0"
@@ -73,4 +73,4 @@ build-backend = "poetry.core.masonry.api"
7373
[tool.pytest.ini_options]
7474
markers = [
7575
"integration: mark test as integration test",
76-
]
76+
]

0 commit comments

Comments
 (0)