Skip to content

Commit 77973c7

Browse files
committed
feat(butler): Implement butler pool factory
1 parent cfb1617 commit 77973c7

File tree

6 files changed

+268
-89
lines changed

6 files changed

+268
-89
lines changed

docs/BUTLER.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# CM Service and Butler
2+
The CM Service uses a Butler at two levels: for its own use and for the use of batch jobs it submits.
3+
4+
## First-Party Butlers
5+
CM Service uses Butlers to solve data queries, such as those used to determine group composition and splitting characteristics. CM Service also uses Butlers to assemble input collections and to chain output collections.
6+
7+
To provide Butler services for these first-party operations, CM Service uses a Butler Factory, which provides the following functionality:
8+
9+
- Instantiated and pre-cached Butler instances created on application startup.
10+
- A caching mechanism to provide Butler clone instances to factory callers.
11+
- Cloned Butler instances may be collection-constrained.
12+
13+
From this factory, CM Service will obtain a short-lived Butler clone for performing first-party butler operations.
14+
Where these operations are synchronous and the CM event loop should not be blocked, Butler methods are executed on a threadpool or delegated to a FastAPI/starlette BackgroundTask.
15+
16+
The CM Service establishes a Butler factory at startup so that any blocking IO involved with the instantiation of Butlers is front-loaded.
17+
CM Service uses some configuration details to determine the set of Butlers that the factory should support:
18+
19+
- The environment variable `DAF_BUTLER_REPOSITORIES` is used to construct an instance of `lsst.daf.butler.ButlerRepoIndex` for the service instance.
20+
- The Butler registry authentication information must be available as a JSON string serialized to the `LSST_DB_AUTH_CREDENTIALS` environment variable, which is consumed by `lsst.util.db_auth.DbAuth` as its `authList`; CM Service does not support reading a DbAuth credential set from a file, as this requires file permissions incompatible with Kubernetes secrets mounted as volumes while also using a nonroot user in the container.
21+
22+
The contents of the `DAF_BUTLER_REPOSITORIES` environment variable is a JSON string that represents a mapping of Butler repo names to their associated configuration files.
23+
24+
<details>
25+
<summary>Example Repository Index JSON object</summary>
26+
27+
```json
28+
{
29+
"/repo/main": "/sdf/group/rubin/repo/main/butler.yaml",
30+
"/repo/main+sasquatch_dev": "/sdf/group/rubin/repo/main/butler+sasquatch_dev.yaml"
31+
}
32+
```
33+
34+
</details>
35+
36+
## Second-Party Butlers
37+
When CM Service submits a batch job to run on a WMS, chances are high that the operations within that batch job will have to access a Butler for file IO.
38+
39+
The configuration parameter `config.butler.repository_index` is used to identify a Butler repository index file *relative to the WMS batch environment* that will be implicitly used by batch operations. This repository index file is expected to provide and resolve the correct Butler repository detail for a batch job that has been configured to use a Butler repository by its label.
40+
41+
The value of this configuration parameter is assigned to the environment variable `DAF_BUTLER_REPOSITORY_INDEX` in an environment specific to the batch submission operation, from which it is expected to be consumed by the appropriate `lsst.daf_butler` mechanisms when needed.
42+
43+
Authentication secrets for second-party Butlers are provided to batch jobs via `PGPASSFILE` and `PGUSER` environment variables, which are variables used by the `libpq` PostgreSQL client library as standard sources when no superceding sources are available.
44+
45+
The `PGUSER` variable is populated by the CM Service configuration parameter `config.butler.default_username`.
46+
The `PGPASSFILE` value is constructed using the `config.htcondor.remote_user_home` configuration parameter and the fixed value `.lsst/postgres-credentials.txt`. This file is expected to be in a standard PostgreSQL password file format and have appropriate file mode permissions (i.e., not group- or world-accessible) as required by `libpq`.
47+
48+
> [!NOTE]
49+
> This approach is suboptimal and delegates too many assumptions to the eventual batch submission context, especially the presence of specific files in specific locations. The batch submission context should be more or less completely controlled by the submitting service, including the creation of a specific short-lived `PGPASSFILE` with contents provided by the service's own understanding of the Butler repository. This assumes that the service will not submit a job that depends on a Butler which the service does not itself understand.
50+
51+
> [!NOTE]
52+
> Secrets for second-party Butlers may also be provided via an environment variable. By setting `LSST_DB_AUTH_CREDENTIALS` with the JSON string representation of a `db-auth.yaml` file, all dependencies on presumed filesystem objects in the submission environment are resolved.

src/lsst/cmservice/common/butler.py

Lines changed: 136 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,34 @@
1-
"""Utility functions for working with butler commands"""
1+
"""Module in support of working with Butlers.
2+
3+
A CM Service that has a configured ``BUTLER_REPO_INDEX`` provides a
4+
factory function that manages a pool of Butler instances for each of these
5+
repos. This factory function is inspired by the
6+
`lsst.daf.butler.LabeledButlerFactory` and provides ``clone()`` instances of
7+
available Butlers when asked to provide one.
8+
9+
Notes
10+
-----
11+
The butler "factory" follows a "global" pattern where it is assigned to
12+
a module-level variable at import-time. Other modules can import this factory
13+
and use it to produce on-demand butler clones. It is not necessary to use a
14+
butler factory as an injected dependency when using this pattern, but this
15+
module should be imported as early as possible in the application startup;
16+
it does not depend on a running event loop.
17+
"""
218

3-
from functools import partial
19+
from collections.abc import Callable
20+
from functools import cache, partial
21+
from typing import TYPE_CHECKING
422

5-
import yaml
6-
from anyio import Path, to_thread
7-
from sqlalchemy.engine import url
23+
from anyio import to_thread
24+
from botocore.exceptions import ClientError
25+
from sqlalchemy.exc import OperationalError
826

927
from lsst.daf.butler import Butler, ButlerConfig, ButlerRepoIndex
1028
from lsst.daf.butler._exceptions import MissingCollectionError
29+
from lsst.daf.butler.direct_butler import DirectButler
30+
from lsst.daf.butler.registry import CollectionArgType, RegistryConfig
1131
from lsst.resources import ResourcePathExpression
12-
from lsst.utils.db_auth import DbAuth
1332

1433
from ..config import config
1534
from . import errors
@@ -20,55 +39,128 @@
2039

2140
BUTLER_REPO_INDEX = ButlerRepoIndex()
2241
"""An index of all known butler repositories, as populated by the
23-
DAF_BUTLER_REPOSITORIES environment variable.
42+
``DAF_BUTLER_REPOSITORIES`` environment variable.
2443
"""
2544

2645

27-
async def get_butler_config(repo: str, *, without_datastore: bool = False) -> ButlerConfig:
28-
"""Create a butler config object for a repo known to the service's
29-
environment.
46+
class ButlerFactory:
47+
"""The ButlerFactory will create an instance of each Butler known to the
48+
application during initialization. This occurs synchronously so it is best
49+
performed at application startup. After initializing, the factory can hand
50+
out ``clone()`` copies of available Butlers.
3051
"""
3152

32-
try:
33-
repo_uri: ResourcePathExpression = BUTLER_REPO_INDEX.get_repo_uri(label=repo)
34-
except KeyError:
35-
# No such repo known to the service
36-
logger.warning("Butler repo %s not known to environment.", repo)
37-
repo_uri = repo
38-
39-
try:
40-
bc_f = partial(
41-
ButlerConfig,
42-
other=repo_uri,
43-
without_datastore=without_datastore,
44-
)
45-
bc = await to_thread.run_sync(bc_f)
46-
except FileNotFoundError:
47-
# No such repo known to Butler
48-
logger.error("Butler repo %s not known.", repo)
49-
raise RuntimeError("Unknown Butler Repo %s", repo)
53+
def __init__(self) -> None:
54+
"""Initialize a ButlerFactory by creating butler pool instances for
55+
each known repository.
56+
"""
57+
# create and cache any butler factories known to the service
58+
for label in BUTLER_REPO_INDEX.get_known_repos():
59+
if config.butler.eager:
60+
_ = self.get_butler_factory(label)
61+
62+
def get_butler(self, label: str, collections: list[str] | None = None) -> Butler | None:
63+
"""Get a butler clone from the factory with the specified collections
64+
constraint applied.
65+
66+
Notes
67+
-----
68+
This is the primary public interface to the factory object.
69+
"""
70+
factory = self.get_butler_factory(label)
71+
if factory is None:
72+
return None
73+
return factory(collections=collections)
74+
75+
@cache
76+
def get_butler_factory(
77+
self, label: str, *, without_datastore: bool = True
78+
) -> Callable[..., Butler] | None:
79+
"""Return a factory function that creates a butler clone.
80+
81+
Notes
82+
-----
83+
This method is backed by a `functools.cache`, a threadsafe cache.
84+
85+
If the return value is None, the service could not create a Butler for
86+
the desired label or no such Butler is configured. In the former case,
87+
the service log should include exception information related to the
88+
failed Butler creation.
89+
90+
If the application's Butler ``eager`` parameter is set, the Factory
91+
instance instantiates all known Butlers at initialization. If this
92+
parameter is false, then calling this method the first time will block
93+
until the requested Butler is ready.
94+
95+
Returns
96+
-------
97+
`lsst.daf.butler` or `None`
98+
A cloned instance of a ``Butler`` or None if the labelled Butler
99+
could not be created from the configuration inputs.
100+
"""
101+
try:
102+
_butler_config = self.get_butler_config(label=label)
103+
_butler = Butler.from_config(_butler_config, without_datastore=without_datastore)
104+
if TYPE_CHECKING:
105+
assert isinstance(_butler, DirectButler)
106+
_butler._preload_cache(load_dimension_record_cache=False)
107+
except (ClientError, OperationalError):
108+
# Case that configured butler could not be created because of an
109+
# S3 or database error
110+
logger.exception()
111+
return None
112+
except KeyError:
113+
# Case that no such butler was configured
114+
logger.warning("No such butler configured: %s", label)
115+
return None
116+
117+
def factory(collections: CollectionArgType) -> Butler:
118+
return _butler.clone(collections=collections)
119+
120+
return factory
121+
122+
@cache
123+
def get_butler_config(self, label: str) -> ButlerConfig:
124+
"""Create a butler config object for a repo known to the service's
125+
environment.
126+
127+
Returns
128+
-------
129+
``lsst.daf.butler.ButlerConfig``
130+
"""
131+
132+
try:
133+
repo_uri: ResourcePathExpression = BUTLER_REPO_INDEX.get_repo_uri(label=label)
134+
except KeyError:
135+
logger.warning("Butler repo %s not known to environment.", label)
136+
repo_uri = label
137+
138+
try:
139+
bc = ButlerConfig(other=repo_uri)
140+
except FileNotFoundError:
141+
logger.error("Butler repo %s not known.", label)
142+
raise RuntimeError("Unknown Butler Repo %s", label)
50143

51-
try:
52-
db_auth_info = yaml.safe_load(await Path(config.butler.authentication_file).read_text())
53-
except FileNotFoundError:
54-
logger.error("No Butler Registry authentication secrets found.")
55-
# delegate db auth info discovery to normal toolchain
56144
return bc
57145

58-
db_url = url.make_url(bc["registry"]["db"])
59-
db_auth = DbAuth(authList=db_auth_info).getAuth(
60-
dialectname=db_url.drivername,
61-
username=db_url.username,
62-
host=db_url.host,
63-
port=db_url.port,
64-
database=db_url.database,
65-
)
146+
@cache
147+
def get_butler_registry_config(self, label: str) -> RegistryConfig:
148+
"""Fetch the Registry Config for a Butler by label.
149+
150+
Registry
151+
--------
152+
``lsst.daf.butler.registry.RegistryConfig``
153+
"""
154+
return RegistryConfig(self.get_butler_config(label=label))
66155

67-
bc[".registry.username"] = db_auth[0]
68-
bc[".registry.password"] = db_auth[1]
69-
return bc
156+
157+
BUTLER_FACTORY = ButlerFactory()
158+
"""A module level butler factory created at module import, available for use
159+
in other modules.
160+
"""
70161

71162

163+
# TODO: deprecate these functions that attempt to "remove" data from Butlers.
72164
async def remove_run_collections(
73165
butler_repo: str,
74166
collection_name: str,

src/lsst/cmservice/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ class ButlerConfiguration(BaseModel):
6464
default="butler",
6565
)
6666

67+
# FIXME this index is used to hydrate a WMS submission environment, so it
68+
# is not used by CM Service to construct Butlers, and may be variable
69+
# between sites, so should be relocated to a facility-specific config
6770
repository_index: str = Field(
6871
description="Fully qualified path to a butler repository index.",
6972
default="/sdf/group/rubin/shared/data-repos.yaml",
@@ -79,6 +82,11 @@ class ButlerConfiguration(BaseModel):
7982
default="rubin",
8083
)
8184

85+
eager: bool = Field(
86+
description="Whether to eagerly instantiate known Butlers at Factory startup",
87+
default=True,
88+
)
89+
8290
mock: bool = Field(
8391
description="Whether to mock out Butler calls.",
8492
default=False,

src/lsst/cmservice/daemon.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from safir.logging import configure_uvicorn_logging
1111

1212
from . import __version__
13+
from .common.butler import BUTLER_FACTORY # noqa: F401
1314
from .common.daemon import daemon_iteration
1415
from .common.logging import LOGGER
1516
from .common.panda import get_panda_token
@@ -30,19 +31,20 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
3031
os.environ |= config.panda.model_dump(by_alias=True, exclude_none=True)
3132
os.environ |= config.htcondor.model_dump(by_alias=True, exclude_none=True)
3233
app.state.tasks = set()
33-
daemon = create_task(main_loop(), name="daemon")
34+
daemon = create_task(main_loop(app=app), name="daemon")
3435
app.state.tasks.add(daemon)
3536
yield
3637
# stop
3738

3839

39-
async def main_loop() -> None:
40+
async def main_loop(app: FastAPI) -> None:
4041
"""Daemon execution loop.
4142
4243
With a database session, perform a single daemon interation and then sleep
4344
until the next daemon appointment.
4445
"""
4546
engine = create_database_engine(config.db.url, config.db.password)
47+
4648
sleep_time = config.daemon.processing_interval
4749

4850
async with engine.begin():

src/lsst/cmservice/handlers/elements.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
from anyio import to_thread
99
from sqlalchemy.ext.asyncio import async_scoped_session
1010

11-
from lsst.daf.butler import Butler
12-
13-
from ..common.butler import get_butler_config
11+
from ..common.butler import BUTLER_FACTORY
1412
from ..common.enums import StatusEnum
1513
from ..common.errors import CMMissingScriptInputError, test_type_and_raise
14+
from ..common.logging import LOGGER
1615
from ..config import config
1716
from ..db.campaign import Campaign
1817
from ..db.element import ElementMixin
@@ -21,6 +20,8 @@
2120
from ..db.script import Script
2221
from .script_handler import FunctionHandler
2322

23+
logger = LOGGER.bind(module=__name__)
24+
2425

2526
class RunElementScriptHandler(FunctionHandler):
2627
"""Shared base class to handling running and
@@ -221,14 +222,10 @@ async def split(
221222
if mock_butler:
222223
sorted_field_values = np.arange(10)
223224
else:
224-
butler_config = await get_butler_config(butler_repo, without_datastore=True)
225-
butler_f = partial(
226-
Butler.from_config,
227-
butler_config,
228-
collections=[input_coll, campaign_input_coll],
229-
without_datastore=True,
230-
)
231-
butler = await to_thread.run_sync(butler_f)
225+
butler = BUTLER_FACTORY.get_butler(butler_repo, collections=[input_coll, campaign_input_coll])
226+
if butler is None:
227+
logger.error(f"butler repo {butler_repo} is not known to the application.")
228+
raise RuntimeError("No such butler")
232229
itr_q_f = partial(
233230
butler.registry.queryDataIds,
234231
[split_field],

0 commit comments

Comments
 (0)