Skip to content

Commit 7f7275b

Browse files
committed
feat(butler): Implement butler pool factory
1 parent 1a7a6f7 commit 7f7275b

File tree

4 files changed

+243
-82
lines changed

4 files changed

+243
-82
lines changed

src/lsst/cmservice/common/butler.py

Lines changed: 185 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,38 @@
1-
"""Utility functions for working with butler commands"""
1+
"""Module in support of working with Butlers.
2+
3+
A CM Service that has a configured ``BUTLER_REPO_INDEX`` provides a
4+
factory function that manages a pool of Butler instances for each of these
5+
repos. This factory function is inspired by the
6+
`lsst.daf.butler.LabeledButlerFactory` and provides ``clone()`` instances of
7+
available Butlers when asked to provide one.
8+
9+
Notes
10+
-----
11+
The butler "factory" follows a "global" pattern where it is assigned to
12+
a module-level variable at import-time. Other modules can import this factory
13+
and use it to produce on-demand butler clones. It is not necessary to use a
14+
butler factory as an injected dependency when using this pattern, but this
15+
module should be imported as early as possible in the application startup;
16+
it does not depend on a running event loop.
17+
"""
218

3-
from functools import partial
19+
from collections.abc import Callable
20+
from functools import cache, cached_property, partial
21+
from pathlib import Path
22+
from typing import TYPE_CHECKING
423

524
import yaml
6-
from anyio import Path, to_thread
25+
from anyio import to_thread
26+
from botocore.exceptions import ClientError
727
from sqlalchemy.engine import url
28+
from sqlalchemy.exc import OperationalError
829

930
from lsst.daf.butler import Butler, ButlerConfig, ButlerRepoIndex
1031
from lsst.daf.butler._exceptions import MissingCollectionError
32+
from lsst.daf.butler.direct_butler import DirectButler
33+
from lsst.daf.butler.registry import CollectionArgType
1134
from lsst.resources import ResourcePathExpression
12-
from lsst.utils.db_auth import DbAuth
35+
from lsst.utils.db_auth import DbAuth, DbAuthError
1336

1437
from ..config import config
1538
from . import errors
@@ -24,51 +47,170 @@
2447
"""
2548

2649

27-
async def get_butler_config(repo: str, *, without_datastore: bool = False) -> ButlerConfig:
28-
"""Create a butler config object for a repo known to the service's
29-
environment.
50+
class ButlerFactory:
51+
"""When created, the LabeledButlerFactory will create an instance of each
52+
Butler known to the application configuration. This occurs synchronously
53+
so it is best performed at application startup. After initializing, the
54+
factory can hand out ``clone()`` copies of available Butlers.
3055
"""
3156

32-
try:
33-
repo_uri: ResourcePathExpression = BUTLER_REPO_INDEX.get_repo_uri(label=repo)
34-
except KeyError:
35-
# No such repo known to the service
36-
logger.warning("Butler repo %s not known to environment.", repo)
37-
repo_uri = repo
38-
39-
try:
40-
bc_f = partial(
41-
ButlerConfig,
42-
other=repo_uri,
43-
without_datastore=without_datastore,
44-
)
45-
bc = await to_thread.run_sync(bc_f)
46-
except FileNotFoundError:
47-
# No such repo known to Butler
48-
logger.error("Butler repo %s not known.", repo)
49-
raise RuntimeError("Unknown Butler Repo %s", repo)
50-
51-
try:
52-
db_auth_info = yaml.safe_load(await Path(config.butler.authentication_file).read_text())
53-
except FileNotFoundError:
54-
logger.error("No Butler Registry authentication secrets found.")
55-
# delegate db auth info discovery to normal toolchain
57+
def __init__(self) -> None:
58+
"""Initialize a butler factory by creating butler pool instances for
59+
each known repository.
60+
"""
61+
# forced property lookup; cache the registry auth file contents
62+
_ = self.butler_auth_config
63+
64+
# create and cache any butler factories known to the service
65+
for label in BUTLER_REPO_INDEX.get_known_repos():
66+
_ = self.get_butler_factory(label)
67+
return None
68+
69+
@cached_property
70+
def butler_auth_config(self) -> list[dict[str, str]] | None:
71+
"""Read a butler authentication file for secrets.
72+
73+
Notes
74+
-----
75+
This is a `functools.cached_property` that should be first accessed
76+
during instance initialization so any IO is performed synchronously
77+
at application startup and cached for the lifetime of the object and/or
78+
the application.
79+
"""
80+
try:
81+
# FIXME ought to validate the loaded value against expectations
82+
return yaml.safe_load(Path(config.butler.authentication_file).read_text())
83+
except FileNotFoundError:
84+
logger.warning("No Butler Registry authentication secrets file found.")
85+
# delegate db auth info discovery to normal toolchain
86+
return None
87+
88+
def get_butler(self, label: str, collections: list[str] | None = None) -> Butler | None:
89+
"""Get a butler clone from the factory.
90+
91+
Notes
92+
-----
93+
This is the primary public interface to the factory object.
94+
"""
95+
factory = self.get_butler_factory(label)
96+
if factory is None:
97+
return None
98+
return factory(collections=collections)
99+
100+
@cache
101+
def get_butler_factory(self, label: str) -> Callable[..., Butler] | None:
102+
"""Return a factory function that creates a butler clone.
103+
104+
Notes
105+
-----
106+
This method is backed by a `functools.cache`, a threadsafe cache.
107+
108+
If the return value is None, the service could not create a Butler for
109+
the desired label or no such Butler is configured. In the former case,
110+
the service log should include exception information related to the
111+
failed Butler creation.
112+
113+
Returns
114+
-------
115+
`lsst.daf.butler` or `None`
116+
A cloned instance of a ``Butler`` or None if the labelled Butler
117+
could not be created from the configuration inputs.
118+
"""
119+
try:
120+
_butler_config = self.get_butler_config(label=label)
121+
_butler = Butler.from_config(_butler_config)
122+
if TYPE_CHECKING:
123+
assert isinstance(_butler, DirectButler)
124+
_butler._preload_cache(load_dimension_record_cache=False)
125+
except (ClientError, OperationalError):
126+
# Case that configured butler was unable to be created
127+
logger.exception()
128+
return None
129+
except KeyError:
130+
# Case that no such butler was configured
131+
logger.warning(f"No such butler configured: {label}")
132+
return None
133+
134+
def factory(collections: CollectionArgType) -> Butler:
135+
return _butler.clone(collections=collections)
136+
137+
return factory
138+
139+
def update_butler_url(self, bc: ButlerConfig) -> ButlerConfig:
140+
"""Update a butler config with registry secrets.
141+
142+
Returns
143+
-------
144+
``ButlerConfig``
145+
The same configuration object passed as input is returned, with
146+
secrets applied or not, depending on whether secrets were available
147+
for the input config's registry URL.
148+
149+
Notes
150+
-----
151+
This method makes use of the cached ``butler_auth_config`` property
152+
which contains the set of butler registry secrets known to the service.
153+
Missing secrets may produce non-working Butlers, but not all butlers
154+
have associated secrets, so failure to locate a secret is not an error.
155+
"""
156+
if self.butler_auth_config is None:
157+
return bc
158+
159+
db_url = url.make_url(bc["registry"]["db"])
160+
try:
161+
db_auth = DbAuth(authList=self.butler_auth_config).getAuth(
162+
dialectname=db_url.drivername,
163+
username=db_url.username,
164+
host=db_url.host,
165+
port=db_url.port,
166+
database=db_url.database,
167+
)
168+
bc[".registry.username"] = db_auth[0]
169+
bc[".registry.password"] = db_auth[1]
170+
except DbAuthError as e:
171+
logger.warning(f"Could not parse db auth from provided url[{db_url}]: {e}")
56172
return bc
57173

58-
db_url = url.make_url(bc["registry"]["db"])
59-
db_auth = DbAuth(authList=db_auth_info).getAuth(
60-
dialectname=db_url.drivername,
61-
username=db_url.username,
62-
host=db_url.host,
63-
port=db_url.port,
64-
database=db_url.database,
65-
)
66-
67-
bc[".registry.username"] = db_auth[0]
68-
bc[".registry.password"] = db_auth[1]
69-
return bc
174+
def get_butler_config(self, label: str, *, without_datastore: bool = True) -> ButlerConfig:
175+
"""Create a butler config object for a repo known to the service's
176+
environment.
177+
178+
Returns
179+
-------
180+
lsst.daf.butler.ButlerConfig
181+
Unless otherwise specified, the ButlerConfig returned by this
182+
method is configured ``without_datastore``.
183+
"""
184+
185+
try:
186+
repo_uri: ResourcePathExpression = BUTLER_REPO_INDEX.get_repo_uri(label=label)
187+
except KeyError:
188+
logger.warning("Butler repo %s not known to environment.", label)
189+
repo_uri = label
190+
191+
try:
192+
bc_f = partial(
193+
ButlerConfig,
194+
other=repo_uri,
195+
without_datastore=without_datastore,
196+
)
197+
bc = bc_f()
198+
except FileNotFoundError:
199+
logger.error("Butler repo %s not known.", label)
200+
raise RuntimeError("Unknown Butler Repo %s", label)
201+
202+
# Any generated Butler config is hydrated with application secrets via
203+
# this tail call.
204+
return self.update_butler_url(bc)
205+
206+
207+
BUTLER_FACTORY = ButlerFactory()
208+
"""A module level butler factory created at module import, available for use
209+
in other modules.
210+
"""
70211

71212

213+
# TODO: deprecate these functions that attempt to "remove" data from Butlers.
72214
async def remove_run_collections(
73215
butler_repo: str,
74216
collection_name: str,

src/lsst/cmservice/daemon.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from safir.logging import configure_uvicorn_logging
1111

1212
from . import __version__
13+
from .common.butler import BUTLER_FACTORY # noqa: F401
1314
from .common.daemon import daemon_iteration
1415
from .common.logging import LOGGER
1516
from .common.panda import get_panda_token
@@ -30,19 +31,20 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
3031
os.environ |= config.panda.model_dump(by_alias=True, exclude_none=True)
3132
os.environ |= config.htcondor.model_dump(by_alias=True, exclude_none=True)
3233
app.state.tasks = set()
33-
daemon = create_task(main_loop(), name="daemon")
34+
daemon = create_task(main_loop(app=app), name="daemon")
3435
app.state.tasks.add(daemon)
3536
yield
3637
# stop
3738

3839

39-
async def main_loop() -> None:
40+
async def main_loop(app: FastAPI) -> None:
4041
"""Daemon execution loop.
4142
4243
With a database session, perform a single daemon interation and then sleep
4344
until the next daemon appointment.
4445
"""
4546
engine = create_database_engine(config.db.url, config.db.password)
47+
4648
sleep_time = config.daemon.processing_interval
4749

4850
async with engine.begin():

src/lsst/cmservice/handlers/elements.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
from anyio import to_thread
99
from sqlalchemy.ext.asyncio import async_scoped_session
1010

11-
from lsst.daf.butler import Butler
12-
13-
from ..common.butler import get_butler_config
11+
from ..common.butler import BUTLER_FACTORY
1412
from ..common.enums import StatusEnum
1513
from ..common.errors import CMMissingScriptInputError, test_type_and_raise
14+
from ..common.logging import LOGGER
1615
from ..config import config
1716
from ..db.campaign import Campaign
1817
from ..db.element import ElementMixin
@@ -21,6 +20,8 @@
2120
from ..db.script import Script
2221
from .script_handler import FunctionHandler
2322

23+
logger = LOGGER.bind(module=__name__)
24+
2425

2526
class RunElementScriptHandler(FunctionHandler):
2627
"""Shared base class to handling running and
@@ -221,14 +222,10 @@ async def split(
221222
if mock_butler:
222223
sorted_field_values = np.arange(10)
223224
else:
224-
butler_config = await get_butler_config(butler_repo, without_datastore=True)
225-
butler_f = partial(
226-
Butler.from_config,
227-
butler_config,
228-
collections=[input_coll, campaign_input_coll],
229-
without_datastore=True,
230-
)
231-
butler = await to_thread.run_sync(butler_f)
225+
butler = BUTLER_FACTORY.get_butler(butler_repo, collections=[input_coll, campaign_input_coll])
226+
if butler is None:
227+
logger.error(f"butler repo {butler_repo} is not known to the application.")
228+
raise RuntimeError("No such butler")
232229
itr_q_f = partial(
233230
butler.registry.queryDataIds,
234231
[split_field],

0 commit comments

Comments
 (0)