Skip to content

Commit bba2f7a

Browse files
committed
chore: refactor resolve_collections method
1 parent fa96dc5 commit bba2f7a

File tree

2 files changed

+59
-42
lines changed

2 files changed

+59
-42
lines changed

docs/BUTLER.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,35 @@ The `PGPASSFILE` value is constructed using the `config.htcondor.remote_user_hom
5050
5151
> [!NOTE]
5252
> Secrets for second-party Butlers may also be provided via an environment variable. By setting `LSST_DB_AUTH_CREDENTIALS` with the JSON string representation of a `db-auth.yaml` file, all dependencies on presumed filesystem objects in the submission environment are resolved.
53+
54+
## Butler Collection Management
55+
56+
CM Service creates tagged and chained Butler collections during its runtime:
57+
58+
### Preflight
59+
During Campaign preflight, three Butler collection operations are called. This happens before any steps, groups, or jobs are created or executed.
60+
61+
1. A *tagged* collection is made from the Campaign's `collection.campaign_source` setting, constrained by the Campaign's `data.data_query` setting.
62+
1. A *chained* collection is made from the Campaign's `collection.campaign_ancillary_inputs` setting.
63+
1. A *chained* collection is made from the previous two collections.
64+
65+
The final chained collection is used as an *input* collection for all subsequent Campaign step operations, i.e., it is identified as part of the `payload.inCollection` for any BPS workflow files generated by the Campaign.
66+
67+
### Stepwise Processing
68+
During Campaign stepwise processing, each Step in the Campaign includes Butler collection operations:
69+
70+
1. A step-specific *chained* collection is made from Campaign input collection and applied to the `payload.inCollection` parameter.
71+
1. A step-group-specific *run* collection is made as a side effect of executing the step, named as indicated by the Group's `payload.outputRun` BPS Workflow parameter.
72+
1. A step-specific *chained* collection is made from the set of *run* collections generated by the step-groups.
73+
74+
> [!Note]
75+
> During Stepwise processing, the BPS Workflow `payload.dataQuery` is populated according to the Step's `child_config.base_query` parameter and modified according to any Group splitting algorithm applied to the Step; it is not affected by the `data.data_query` at the Campaign level.
76+
77+
> [!Note]
78+
> Presumably, if the *tagged* Campaign input collection was constrained by a meaningful data query, then that query does not need to be repeated in the Stepwise consideration of Butler collections, and only the result of group-split algorithms is necessary. However, this means that any out-of-band observer of a CM-generated BPS workflow file will not understand the nature of the input collection and its interaction with the data query without cross-referencing, so insofar as it improves understandability, the workflow file should be as comprehensively detailed as possible, even if doing so is redudant.
79+
80+
### Postflight
81+
During Campaign postflight, Butler collection operations are used to further chain together Campaign elements, eventually resulting in a single *chained* collection for the entire Campaign.
82+
83+
1. Each step-specific *chained* collection is itself chained to a Campaign *chained* "output" collection.
84+
1. The final *chained* collection, named according to the Campaign's `collection.out` parameter, includes the Campaign "output" collection, the Campaign "input" collection, and the Campaign "resource_usage" collection.

src/lsst/cmservice/db/node.py

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import re
4+
from collections import ChainMap, defaultdict
35
from typing import TYPE_CHECKING, Any
46

57
from sqlalchemy.exc import IntegrityError
@@ -201,10 +203,9 @@ async def resolve_collections(
201203
202204
Notes
203205
-----
204-
This will return a dict with all of the collections
205-
templated defined for this node resovled using
206-
collection aliases and collection templates
207-
defined up the processing heirarchy
206+
This will return a dict with all of the collections templates defined
207+
for this node resolved using collection aliases and collection templ-
208+
ates defined up the processing hierarchy
208209
209210
Parameters
210211
----------
@@ -219,47 +220,31 @@ async def resolve_collections(
219220
resolved_collections: dict
220221
Resolved collection names
221222
"""
222-
my_collections = await NodeMixin.get_collections(self, session)
223+
raw_collections: dict[str, str | list[str]] = await NodeMixin.get_collections(self, session)
223224
collection_dict = await self.get_collections(session)
224225
name_dict = self._split_fullname(self.fullname)
225-
name_dict["out"] = collection_dict.pop("out")
226-
resolved_collections: dict = {}
227-
for name_, val_ in my_collections.items():
228-
if isinstance(val_, list): # pragma: no cover
229-
# FIXME, see if this is now being tested
230-
resolved_collections[name_] = []
231-
# FIXME disambiguate what types val_, item_ and f1 are supposed
232-
# to be
233-
for item_ in val_:
234-
try:
235-
f1 = item_.format(**collection_dict)
236-
except KeyError:
237-
f1 = val_
238-
try:
239-
resolved_collections[name_].append(f1.format(**name_dict))
240-
except KeyError as e:
241-
raise CMResolveCollectionsError(
242-
f"Failed to resolve collection {name_} {f1} using: {name_dict!s}",
243-
) from e
244-
resolved_collections[name_] = ",".join(resolved_collections[name_])
245-
else:
246-
try:
247-
f1 = val_.format(**collection_dict)
248-
except KeyError:
249-
f1 = val_
250-
try:
251-
resolved_collections[name_] = f1.format(**name_dict)
252-
except KeyError as msg:
253-
raise CMResolveCollectionsError(
254-
f"Failed to resolve collection {name_}, {f1} using: {name_dict!s}",
255-
) from msg
226+
lookup_chain = ChainMap(collection_dict, name_dict, defaultdict(lambda: "MUST_OVERRIDE"))
227+
228+
resolved_collections = {
229+
k: (v if isinstance(v, str) else ",".join(v)) for k, v in raw_collections.items()
230+
}
231+
232+
# It may take multiple passes to format all the placeholder
233+
# tokens in the collection strings, repeat the formatting until no such
234+
# tokens remain.
235+
while unresolved_collections := {
236+
k: v for k, v in resolved_collections.items() if re.search("{.*}", v)
237+
}:
238+
for k, v in unresolved_collections.items():
239+
resolved_collections[k] = v.format_map(lookup_chain)
240+
256241
if throw_overrides:
257-
for key, value in resolved_collections.items():
258-
if "MUST_OVERRIDE" in value: # pragma: no cover
259-
raise CMResolveCollectionsError(
260-
f"Attempts to resolve {key} collection includes MUST_OVERRIDE. Make sure to provide "
261-
"necessary collection names."
262-
)
242+
if [v for v in resolved_collections.values() if re.search("MUST_OVERRIDE", v)]:
243+
raise CMResolveCollectionsError(
244+
"Attempts to resolve collection includes MUST_OVERRIDE. Make sure to provide "
245+
"necessary collection names."
246+
)
247+
263248
return resolved_collections
264249

265250
async def get_collections(

0 commit comments

Comments
 (0)