|
| 1 | +"""Gunicorn master preload hook for Langflow. |
| 2 | +
|
| 3 | +When ``LANGFLOW_GUNICORN_PRELOAD=true`` (which flips gunicorn's |
| 4 | +``preload_app`` option), this module runs the fork-safe parts of |
| 5 | +Langflow's startup in the master process so that worker processes |
| 6 | +inherit the result via copy-on-write. The dominant memory wins are: |
| 7 | +
|
| 8 | +- Python modules imported from custom-component bundles. |
| 9 | +- The component types dict (``lfx.interface.components.component_cache``), |
| 10 | + which is typically tens of MB per worker. |
| 11 | +- Starter-project graphs and related in-process state. |
| 12 | +
|
| 13 | +Fork-unsafe resources must not survive across ``fork`` (live DB connection |
| 14 | +pools, cache-service sockets, prometheus HTTP servers, telemetry threads, MCP |
| 15 | +composer asyncio tasks, queue service, per-worker background tasks, etc.). |
| 16 | +Preload may open the SQLAlchemy engine transiently to run migrations and |
| 17 | +seeding, then ``dispose()`` it before workers are forked; it does not leave a |
| 18 | +pool open for request serving in the master. Other services are torn down or |
| 19 | +never started here. Each worker continues to set up its own pools and services |
| 20 | +in its own FastAPI ``lifespan`` after fork, so workers remain fully independent |
| 21 | +and can each serve any request on their own. |
| 22 | +
|
| 23 | +Failure contract: |
| 24 | + Fork-safety-critical steps (DB engine disposal, cache service teardown) |
| 25 | + that fail will propagate their exception and abort preload. Best-effort |
| 26 | + steps (profile pictures, starter projects, agentic globals, agentic MCP, |
| 27 | + flows) that fail will log the exception with traceback, clear their |
| 28 | + completion flag, and allow preload to continue so workers inherit partial |
| 29 | + progress. Workers re-run any incomplete step during their lifespan. |
| 30 | +
|
| 31 | +Notes on CPython and copy-on-write: |
| 32 | + CPython mutates ``ob_refcnt`` on every attribute access, which |
| 33 | + triggers 4 KB page copies even for "shared" objects. Preloading |
| 34 | + therefore does not eliminate per-worker memory entirely, but it |
| 35 | + significantly reduces the cold-start working set, and calling |
| 36 | + ``gc.freeze()`` after preload prevents the cyclic GC from touching |
| 37 | + long-lived objects and unsharing their pages. |
| 38 | +""" |
| 39 | + |
| 40 | +from __future__ import annotations |
| 41 | + |
| 42 | +import asyncio |
| 43 | +import gc |
| 44 | +import os |
| 45 | +from dataclasses import dataclass, field |
| 46 | +from enum import Enum |
| 47 | +from typing import TYPE_CHECKING, Final |
| 48 | + |
| 49 | +from lfx.log.logger import logger |
| 50 | + |
| 51 | +if TYPE_CHECKING: |
| 52 | + from collections.abc import Awaitable |
| 53 | + from tempfile import TemporaryDirectory |
| 54 | + |
| 55 | + |
| 56 | +class PreloadStep(Enum): |
| 57 | + """Ordered preload phases; completion flags must advance via ``mark_step_complete`` only.""" |
| 58 | + |
| 59 | + PROFILE_PICTURES = "profile_pictures" |
| 60 | + BUNDLES = "bundles" |
| 61 | + TYPES_CACHED = "types_cached" |
| 62 | + STARTER_PROJECTS = "starter_projects" |
| 63 | + AGENTIC_GLOBALS = "agentic_globals" |
| 64 | + AGENTIC_MCP = "agentic_mcp" |
| 65 | + FLOWS = "flows" |
| 66 | + |
| 67 | + |
| 68 | +_STEP_ATTR: Final[dict[PreloadStep, str]] = { |
| 69 | + PreloadStep.PROFILE_PICTURES: "profile_pictures_copied", |
| 70 | + PreloadStep.BUNDLES: "bundles_loaded", |
| 71 | + PreloadStep.TYPES_CACHED: "types_cached", |
| 72 | + PreloadStep.STARTER_PROJECTS: "starter_projects_created", |
| 73 | + PreloadStep.AGENTIC_GLOBALS: "agentic_globals_initialized", |
| 74 | + PreloadStep.AGENTIC_MCP: "agentic_mcp_configured", |
| 75 | + PreloadStep.FLOWS: "flows_loaded", |
| 76 | +} |
| 77 | + |
| 78 | +# Explicit prerequisite DAG matching ``_run_master_preload`` ordering (comments + pipeline). |
| 79 | +_STEP_PREREQUISITES: Final[dict[PreloadStep, tuple[PreloadStep, ...]]] = { |
| 80 | + PreloadStep.PROFILE_PICTURES: (), |
| 81 | + PreloadStep.BUNDLES: (), |
| 82 | + PreloadStep.TYPES_CACHED: (PreloadStep.BUNDLES,), |
| 83 | + PreloadStep.STARTER_PROJECTS: (PreloadStep.TYPES_CACHED,), |
| 84 | + PreloadStep.AGENTIC_GLOBALS: (PreloadStep.TYPES_CACHED,), |
| 85 | + # MCP config may succeed even when globals failed (separate try/except in preload). |
| 86 | + PreloadStep.AGENTIC_MCP: (PreloadStep.TYPES_CACHED,), |
| 87 | + PreloadStep.FLOWS: (PreloadStep.TYPES_CACHED,), |
| 88 | +} |
| 89 | + |
| 90 | + |
| 91 | +def is_step_complete(step: PreloadStep) -> bool: |
| 92 | + """Return True if *step* finished successfully during preload (workers inherit via fork).""" |
| 93 | + attr = _STEP_ATTR[step] |
| 94 | + return bool(getattr(_STATE, attr)) |
| 95 | + |
| 96 | + |
| 97 | +def mark_step_complete(step: PreloadStep) -> None: |
| 98 | + """Record successful completion of *step*, enforcing declared prerequisite ordering.""" |
| 99 | + missing = [p for p in _STEP_PREREQUISITES[step] if not is_step_complete(p)] |
| 100 | + if missing: |
| 101 | + msg = f"Cannot complete preload step {step.value!r}: incomplete prerequisites {[m.value for m in missing]}" |
| 102 | + raise RuntimeError(msg) |
| 103 | + setattr(_STATE, _STEP_ATTR[step], True) |
| 104 | + |
| 105 | + |
| 106 | +@dataclass |
| 107 | +class _PreloadState: |
| 108 | + preloaded: bool = False |
| 109 | + master_pid: int | None = None |
| 110 | + temp_dirs: list[TemporaryDirectory] = field(default_factory=list) |
| 111 | + bundles_components_paths: list[str] = field(default_factory=list) |
| 112 | + # Per-step completion flags to prevent silent data loss |
| 113 | + profile_pictures_copied: bool = False |
| 114 | + bundles_loaded: bool = False |
| 115 | + types_cached: bool = False |
| 116 | + starter_projects_created: bool = False |
| 117 | + agentic_globals_initialized: bool = False |
| 118 | + agentic_mcp_configured: bool = False |
| 119 | + flows_loaded: bool = False |
| 120 | + |
| 121 | + def reset(self) -> None: |
| 122 | + """Restore all fields to their default values. |
| 123 | +
|
| 124 | + Called from the outer failure handler in ``preload_master()`` so that |
| 125 | + a partially-completed preload never leaves inconsistent state behind |
| 126 | + (e.g. ``master_pid`` set while ``preloaded`` is False, or best-effort |
| 127 | + completion flags set for steps that ran before the failure point). |
| 128 | + Cleans up bundle ``TemporaryDirectory`` instances before clearing |
| 129 | + ``temp_dirs`` so failed preloads do not leak on-disk directories. |
| 130 | + After reset, workers take the full non-preload code path. |
| 131 | + """ |
| 132 | + self.preloaded = False |
| 133 | + self.master_pid = None |
| 134 | + for tmp_dir in self.temp_dirs: |
| 135 | + try: |
| 136 | + tmp_dir.cleanup() |
| 137 | + except Exception: # noqa: BLE001 |
| 138 | + logger.exception("[preload] failed to cleanup preload temporary directory") |
| 139 | + self.temp_dirs = [] |
| 140 | + self.bundles_components_paths = [] |
| 141 | + self.profile_pictures_copied = False |
| 142 | + self.bundles_loaded = False |
| 143 | + self.types_cached = False |
| 144 | + self.starter_projects_created = False |
| 145 | + self.agentic_globals_initialized = False |
| 146 | + self.agentic_mcp_configured = False |
| 147 | + self.flows_loaded = False |
| 148 | + |
| 149 | + |
| 150 | +_STATE = _PreloadState() |
| 151 | + |
| 152 | + |
| 153 | +async def _best_effort(step: PreloadStep, log_suffix: str, awaitable: Awaitable[None]) -> None: |
| 154 | + """Run *awaitable* and mark *step* complete on success; log and continue on failure.""" |
| 155 | + try: |
| 156 | + await awaitable |
| 157 | + mark_step_complete(step) |
| 158 | + except Exception: # noqa: BLE001 |
| 159 | + await logger.aexception(f"[preload] {log_suffix}") |
| 160 | + |
| 161 | + |
| 162 | +def is_preloaded() -> bool: |
| 163 | + """Return True iff the master ran the preload hook. |
| 164 | +
|
| 165 | + Workers inherit ``_STATE`` via fork, so this returns True in any |
| 166 | + process forked from a master that completed ``preload_master()``. |
| 167 | + """ |
| 168 | + return _STATE.preloaded |
| 169 | + |
| 170 | + |
| 171 | +def is_master() -> bool: |
| 172 | + """Return True if the current process is the gunicorn master that ran preload.""" |
| 173 | + return _STATE.master_pid is not None and os.getpid() == _STATE.master_pid |
| 174 | + |
| 175 | + |
| 176 | +def get_owned_temp_dirs() -> list[TemporaryDirectory]: |
| 177 | + """Return temp_dirs that the current process owns and should clean up. |
| 178 | +
|
| 179 | + When preloaded: |
| 180 | + - Master returns the preloaded temp_dirs (it owns them) |
| 181 | + - Workers return an empty list (they must NOT clean up master's temp_dirs) |
| 182 | + When not preloaded: |
| 183 | + - Returns an empty list (will be populated by load_bundles later) |
| 184 | +
|
| 185 | + This encodes the master-only ownership rule so callers don't need |
| 186 | + to check is_master() themselves. |
| 187 | + """ |
| 188 | + if _STATE.preloaded and is_master(): |
| 189 | + return _STATE.temp_dirs |
| 190 | + return [] |
| 191 | + |
| 192 | + |
| 193 | +async def _run_master_preload() -> None: |
| 194 | + """Run fork-safe one-time initialization inside an asyncio event loop. |
| 195 | +
|
| 196 | + This function opens the DB engine (for migrations and seeding) and |
| 197 | + always disposes it before returning — including on failure paths — so |
| 198 | + no connections / file descriptors leak into forked workers. |
| 199 | + """ |
| 200 | + from lfx.interface.components import component_cache, get_and_cache_all_types_dict |
| 201 | + |
| 202 | + from langflow.initial_setup.setup import ( |
| 203 | + copy_profile_pictures, |
| 204 | + create_or_update_starter_projects, |
| 205 | + load_flows_from_directory, |
| 206 | + ) |
| 207 | + from langflow.main import load_bundles_with_error_handling |
| 208 | + from langflow.services.deps import ( |
| 209 | + get_db_service, |
| 210 | + get_settings_service, |
| 211 | + get_telemetry_service, |
| 212 | + session_scope, |
| 213 | + ) |
| 214 | + from langflow.services.utils import initialize_services |
| 215 | + |
| 216 | + settings_service = get_settings_service() |
| 217 | + |
| 218 | + await logger.ainfo("[preload] initializing services in master") |
| 219 | + await initialize_services(fix_migration=False) |
| 220 | + |
| 221 | + # Wrap all post-initialization work in try/finally so the DB engine and |
| 222 | + # cache service are always torn down before returning, even on failure. |
| 223 | + # Without this, any exception raised between here and the dispose() calls |
| 224 | + # would leave an open connection pool in the master process, and that pool |
| 225 | + # would be inherited (fork-unsafe) by every worker. |
| 226 | + try: |
| 227 | + await logger.adebug("[preload] copying profile pictures") |
| 228 | + await _best_effort( |
| 229 | + PreloadStep.PROFILE_PICTURES, |
| 230 | + "copy_profile_pictures failed", |
| 231 | + copy_profile_pictures(), |
| 232 | + ) |
| 233 | + |
| 234 | + await logger.ainfo("[preload] loading bundles") |
| 235 | + temp_dirs, bundles_components_paths = await load_bundles_with_error_handling() |
| 236 | + _STATE.temp_dirs = list(temp_dirs) |
| 237 | + _STATE.bundles_components_paths = list(bundles_components_paths) |
| 238 | + settings_service.settings.components_path.extend(bundles_components_paths) |
| 239 | + mark_step_complete(PreloadStep.BUNDLES) |
| 240 | + |
| 241 | + await logger.ainfo("[preload] building component types cache") |
| 242 | + await get_and_cache_all_types_dict(settings_service, get_telemetry_service()) |
| 243 | + mark_step_complete(PreloadStep.TYPES_CACHED) |
| 244 | + |
| 245 | + all_types_dict = component_cache.all_types_dict |
| 246 | + if all_types_dict is not None: |
| 247 | + await logger.adebug("[preload] creating/updating starter projects") |
| 248 | + await _best_effort( |
| 249 | + PreloadStep.STARTER_PROJECTS, |
| 250 | + "starter projects init failed", |
| 251 | + create_or_update_starter_projects(all_types_dict), |
| 252 | + ) |
| 253 | + |
| 254 | + if settings_service.settings.agentic_experience: |
| 255 | + from langflow.api.utils.mcp.agentic_mcp import ( |
| 256 | + auto_configure_agentic_mcp_server, |
| 257 | + initialize_agentic_global_variables, |
| 258 | + ) |
| 259 | + |
| 260 | + await logger.adebug("[preload] initializing agentic global variables") |
| 261 | + |
| 262 | + async def _run_agentic_globals() -> None: |
| 263 | + async with session_scope() as session: |
| 264 | + await initialize_agentic_global_variables(session) |
| 265 | + |
| 266 | + await _best_effort( |
| 267 | + PreloadStep.AGENTIC_GLOBALS, |
| 268 | + "initialize agentic global variables failed", |
| 269 | + _run_agentic_globals(), |
| 270 | + ) |
| 271 | + |
| 272 | + await logger.adebug("[preload] auto-configuring agentic MCP server") |
| 273 | + |
| 274 | + async def _run_agentic_mcp() -> None: |
| 275 | + async with session_scope() as session: |
| 276 | + await auto_configure_agentic_mcp_server(session) |
| 277 | + |
| 278 | + await _best_effort( |
| 279 | + PreloadStep.AGENTIC_MCP, |
| 280 | + "auto-configure agentic MCP server failed", |
| 281 | + _run_agentic_mcp(), |
| 282 | + ) |
| 283 | + |
| 284 | + await logger.adebug("[preload] loading flows from directory") |
| 285 | + await _best_effort( |
| 286 | + PreloadStep.FLOWS, |
| 287 | + "load_flows_from_directory failed", |
| 288 | + load_flows_from_directory(), |
| 289 | + ) |
| 290 | + |
| 291 | + finally: |
| 292 | + # CRITICAL: dispose the DB engine before the master returns control to |
| 293 | + # gunicorn regardless of whether the steps above succeeded or failed. |
| 294 | + # Forking with an open connection pool causes workers to inherit the |
| 295 | + # same TCP / file descriptors, making SQLAlchemy / the DB driver |
| 296 | + # behave unpredictably. After dispose(), the engine object is still |
| 297 | + # usable: on first access in a worker it opens a fresh pool for that |
| 298 | + # process. |
| 299 | + await logger.adebug("[preload] disposing master DB engine before fork") |
| 300 | + await get_db_service().engine.dispose() |
| 301 | + |
| 302 | + # Close cache service socket (e.g. Redis) to prevent sharing across fork. |
| 303 | + # ExternalAsyncBaseCacheService declares teardown() abstract, so any |
| 304 | + # concrete implementation is guaranteed to have it. A failure here is |
| 305 | + # fork-safety-critical and must propagate (no try/except). |
| 306 | + from langflow.services.cache.base import ExternalAsyncBaseCacheService |
| 307 | + from langflow.services.deps import get_service |
| 308 | + from langflow.services.schema import ServiceType |
| 309 | + |
| 310 | + cache_service = get_service(ServiceType.CACHE_SERVICE) |
| 311 | + if isinstance(cache_service, ExternalAsyncBaseCacheService): |
| 312 | + await cache_service.teardown() |
| 313 | + |
| 314 | + |
| 315 | +def preload_master() -> None: |
| 316 | + """Run one-time Langflow initialization in the gunicorn master before workers are forked. |
| 317 | +
|
| 318 | + Safe to call more than once: subsequent calls are no-ops. |
| 319 | + If preload fails for any reason, this function logs the failure and |
| 320 | + returns without setting the preloaded flag, so workers fall back to |
| 321 | + running the full lifespan as before (no behavior regression). |
| 322 | + """ |
| 323 | + if _STATE.preloaded: |
| 324 | + return |
| 325 | + |
| 326 | + _STATE.master_pid = os.getpid() |
| 327 | + |
| 328 | + try: |
| 329 | + asyncio.run(_run_master_preload()) |
| 330 | + except Exception: # noqa: BLE001 |
| 331 | + logger.exception("[preload] master preload failed; falling back to per-worker init") |
| 332 | + # Clear any partial state so workers take the full non-preload path |
| 333 | + # and is_master() / is_preloaded() stay mutually consistent. |
| 334 | + _STATE.reset() |
| 335 | + return |
| 336 | + |
| 337 | + # Help COW: move preload-allocated objects into the permanent generation |
| 338 | + # so the cyclic GC won't touch (and unshare) their pages in workers. |
| 339 | + try: |
| 340 | + gc.collect() |
| 341 | + gc.freeze() |
| 342 | + except Exception: |
| 343 | + logger.exception( |
| 344 | + "[preload] gc.collect()/gc.freeze() failed after async preload; resetting state and re-raising" |
| 345 | + ) |
| 346 | + _STATE.reset() |
| 347 | + raise |
| 348 | + |
| 349 | + _STATE.preloaded = True |
| 350 | + logger.info("[preload] master preload complete; workers will inherit shared state via COW") |
0 commit comments