Skip to content
233 changes: 200 additions & 33 deletions src/lfx/src/lfx/interface/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
MIN_MODULE_PARTS_WITH_FILENAME = 4 # Minimum parts needed to have a module filename (lfx.components.type.filename)
EXPECTED_RESULT_LENGTH = 2 # Expected length of the tuple returned by _process_single_module

# Cap concurrent module scans so asyncio's default thread pool isn't exhausted.
# Without this bound, high-concurrency loads can silently drop components when
# threads are starved.
_MODULE_SCAN_CONCURRENCY = 16


# Create a class to manage component cache instead of using globals
class ComponentCache:
Expand All @@ -42,6 +47,22 @@ def __init__(self):
# None means "not yet loaded" (fail-closed); {} means "loaded, no components found".
self.type_to_current_hash: dict[str, set[str]] | None = None
self.all_known_hashes: set[str] | None = None
# Lazily created on first access from inside a running event loop.
# Constructing asyncio.Lock() at import time raises RuntimeError on
# Python 3.13+ because the singleton below runs at module import.
self._lock: asyncio.Lock | None = None

@property
def lock(self) -> asyncio.Lock:
"""Return the asyncio.Lock, creating it lazily on first access.

Must be called from inside a running event loop: asyncio.Lock() requires
a running loop on Python 3.13+, and the singleton is instantiated at
module import time.
"""
if self._lock is None:
self._lock = asyncio.Lock()
return self._lock


# Singleton instance
Expand Down Expand Up @@ -84,7 +105,7 @@ def _parse_dev_mode() -> tuple[bool, list[str] | None]:
return (False, None)


def _read_component_index(custom_path: str | None = None) -> dict | None:
async def _read_component_index(custom_path: str | None = None) -> dict | None:
"""Read and validate the prebuilt component index.

Args:
Expand All @@ -104,7 +125,10 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
import httpx

try:
response = httpx.get(custom_path, timeout=10.0)
# Sync HTTP call: only reached when the user explicitly points
# components_index_path at an http(s) URL. The common (built-in
# and file-path) read paths are async.
response = httpx.get(custom_path, timeout=10.0) # noqa: ASYNC210
response.raise_for_status()
blob = orjson.loads(response.content)
except httpx.HTTPError as e:
Expand All @@ -120,7 +144,7 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
logger.warning(f"Custom component index not found at {custom_path}")
return None
try:
blob = orjson.loads(index_path.read_bytes())
blob = orjson.loads(await asyncio.to_thread(index_path.read_bytes))
except orjson.JSONDecodeError as e:
logger.warning(f"Component index at {custom_path} is corrupted or invalid JSON: {e}")
return None
Expand All @@ -133,7 +157,8 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
return None

try:
blob = orjson.loads(index_path.read_bytes())
# Read the built-in index off-thread to avoid blocking the event loop.
blob = orjson.loads(await asyncio.to_thread(index_path.read_bytes))
except orjson.JSONDecodeError as e:
logger.warning(f"Built-in component index is corrupted or invalid JSON: {e}")
return None
Expand Down Expand Up @@ -170,7 +195,7 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
)
return None
except Exception as e: # noqa: BLE001
logger.warning(f"Unexpected error reading component index: {type(e).__name__}: {e}")
logger.warning(f"Unexpected error reading component index: {type(e).__name__}: {e}", exc_info=True)
return None
return blob

Expand All @@ -187,6 +212,11 @@ def _get_cache_path() -> Path:
def _save_generated_index(modules_dict: dict) -> None:
"""Save a dynamically generated component index to cache for future use.

The cache is stamped with the lfx package version (not langflow) so lfx-only
deployments don't invalidate the cache on every restart. The write is atomic:
a temp file in the same directory is renamed via os.replace so concurrent
workers never see a torn file.

Args:
modules_dict: Dictionary of components by category
"""
Expand All @@ -200,14 +230,24 @@ def _save_generated_index(modules_dict: dict) -> None:
num_modules = len(modules_dict)
num_components = sum(len(components) for components in modules_dict.values())

# Get version
from importlib.metadata import version
# Stamp with the lfx version (not langflow) so lfx-only deployments
# don't invalidate the cache on every restart. Mirrors the read-time
# PackageNotFoundError fallback above so editable/workspace installs
# without dist-info don't crash the save path.
from importlib.metadata import PackageNotFoundError, version

langflow_version = version("langflow")
try:
lfx_version = version("lfx")
except PackageNotFoundError:
logger.debug(
"Could not determine installed lfx version (no package metadata); "
"stamping generated index with 'unknown'"
)
lfx_version = "unknown"

# Build index structure
index = {
"version": langflow_version,
"version": lfx_version,
"metadata": {
"num_modules": num_modules,
"num_components": num_components,
Expand All @@ -219,13 +259,24 @@ def _save_generated_index(modules_dict: dict) -> None:
payload = orjson.dumps(index, option=orjson.OPT_SORT_KEYS)
index["sha256"] = hashlib.sha256(payload).hexdigest()

# Write to cache
# Atomic write: temp file in the SAME directory as the target, then
# rename via Path.replace (atomic on POSIX and Windows since Python 3.3).
# The temp file must share a filesystem with the target to avoid
# cross-device-link errors in containers where $TMPDIR is tmpfs and the
# cache dir is a persistent volume.
json_bytes = orjson.dumps(index, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2)
cache_path.write_bytes(json_bytes)
tmp_path = cache_path.with_suffix(cache_path.suffix + ".tmp")
tmp_path.write_bytes(json_bytes)
tmp_path.replace(cache_path)

logger.debug(f"Saved generated component index to cache: {cache_path}")
except OSError as e:
# Disk full, permission denied, read-only mount, cross-device link, etc.
# Surface at warning so operators see it: a silent failure here defeats
# the cache and forces a full rebuild on every cold start.
logger.warning(f"Failed to save generated index to cache: {type(e).__name__}: {e}")
except Exception as e: # noqa: BLE001
logger.debug(f"Failed to save generated index to cache: {e}")
logger.error(f"Unexpected error saving generated index to cache: {type(e).__name__}: {e}", exc_info=True)


async def _send_telemetry(
Expand Down Expand Up @@ -293,7 +344,7 @@ async def _load_from_index_or_cache(
custom_index_path = settings_service.settings.components_index_path
await logger.adebug(f"Using custom component index: {custom_index_path}")

index = _read_component_index(custom_index_path)
index = await _read_component_index(custom_index_path)
if index and "entries" in index:
source = custom_index_path or "built-in index"
await logger.adebug(f"Loading components from {source}")
Expand All @@ -316,7 +367,7 @@ async def _load_from_index_or_cache(
else:
if cache_path.exists():
await logger.adebug(f"Attempting to load from cache: {cache_path}")
index = _read_component_index(str(cache_path))
index = await _read_component_index(str(cache_path))
if index and "entries" in index:
await logger.adebug("Loading components from cached index")
for top_level, components in index["entries"]:
Expand Down Expand Up @@ -380,8 +431,16 @@ async def _load_components_dynamically(
if not module_names:
return modules_dict

# Create tasks for parallel module processing
tasks = [asyncio.to_thread(_process_single_module, modname) for modname in module_names]
# Bound concurrent scans with a semaphore so the default thread pool isn't
# exhausted under high module-count workloads. Each asyncio.to_thread call
# is wrapped in a helper that acquires the semaphore before running.
semaphore = asyncio.Semaphore(_MODULE_SCAN_CONCURRENCY)

async def _bounded(modname: str):
async with semaphore:
return await asyncio.to_thread(_process_single_module, modname)

tasks = [_bounded(modname) for modname in module_names]

# Wait for all modules to be processed
try:
Expand All @@ -390,10 +449,15 @@ async def _load_components_dynamically(
await logger.aerror(f"Error during parallel module processing: {e}", exc_info=True)
return modules_dict

# Merge results from all modules
# Merge results from all modules; track failures so we can emit one
# aggregate signal — without this the user sees N individual warnings
# interleaved with normal startup logs and no overall pass/fail.
failure_types: dict[str, int] = {}
for result in module_results:
if isinstance(result, Exception):
await logger.awarning(f"Module processing failed: {result}")
type_name = type(result).__name__
failure_types[type_name] = failure_types.get(type_name, 0) + 1
await logger.awarning(f"Module processing failed ({type_name}): {result}")
continue

if result and isinstance(result, tuple) and len(result) == EXPECTED_RESULT_LENGTH:
Expand All @@ -403,6 +467,13 @@ async def _load_components_dynamically(
modules_dict[top_level] = {}
modules_dict[top_level].update(components)

if failure_types:
total_failed = sum(failure_types.values())
await logger.aerror(
f"Component module load: {total_failed} of {len(module_results)} modules failed. "
f"Failure types: {failure_types}"
)

return modules_dict


Expand Down Expand Up @@ -442,6 +513,15 @@ async def _load_selective_dev_mode(
modules_dict[top_level] = {}
modules_dict[top_level].update(components)

if not modules_dict:
# No index/cache + no targeted module loaded successfully. Without this
# warning the caller sees a successful empty result, which is the worst
# debug-time UX -- the component palette is empty with no signal why.
await logger.awarning(
f"LFX_DEV selective mode produced 0 components for targets {target_modules}. "
"Check that the targeted module names exist under lfx.components and imported cleanly."
)

await logger.adebug(f"Reloaded {len(target_modules)} module(s), kept others from index")
return modules_dict, "dynamic"

Expand Down Expand Up @@ -643,24 +723,111 @@ async def get_and_cache_all_types_dict(
telemetry_service: Optional telemetry service for tracking component loading metrics
"""
if component_cache.all_types_dict is None:
await logger.adebug("Building components cache")

langflow_components = await import_langflow_components(settings_service, telemetry_service)
custom_components_dict = await _determine_loading_strategy(settings_service)
# The peek runs OUTSIDE the lock so a multi-MB disk read doesn't widen
# the lock-hold window. It is idempotent: when two callers cold-start at
# once they may each emit the same warning, which is cosmetic.
from importlib.metadata import PackageNotFoundError as _PackageNotFoundError
from importlib.metadata import version as _version

# Flatten custom dict if it has a "components" wrapper
custom_flat = custom_components_dict.get("components", custom_components_dict) or {}
try:
installed_version = _version("lfx")
except _PackageNotFoundError:
installed_version = None

# Merge built-in and custom components (no wrapper at cache level)
component_cache.all_types_dict = {
**langflow_components["components"],
**custom_flat,
}
component_count = sum(len(comps) for comps in component_cache.all_types_dict.values())
await logger.adebug(f"Loaded {component_count} components")
# Promoted to the SHA-validated cache blob when version matches and
# integrity passes; used to short-circuit the rebuild inside the lock.
_pending_cache_hit: dict | None = None

# Precompute code hash lookups for fast flow validation
_build_code_hash_lookups(component_cache)
if installed_version is not None:
try:
cache_path = _get_cache_path()
except Exception as exc: # noqa: BLE001
logger.debug(f"Could not resolve component cache path: {exc}")
cache_path = None
if cache_path is not None and cache_path.exists():
cached_blob: Any = None
try:
cached_blob = orjson.loads(await asyncio.to_thread(cache_path.read_bytes))
except (OSError, orjson.JSONDecodeError) as exc:
logger.warning(
f"Component cache peek failed at {cache_path} "
f"({type(exc).__name__}); falling through to rebuild: {exc}"
)

if isinstance(cached_blob, dict):
cached_version = cached_blob.get("version")
if isinstance(cached_version, str) and cached_version and cached_version != installed_version:
logger.warning(
f"Stale component cache: cached={cached_version}, "
f"installed={installed_version}, path={cache_path}. "
"Removing so it is regenerated on the next build."
)
try:
cache_path.unlink(missing_ok=True)
except OSError as exc:
logger.warning(f"Could not remove stale component cache at {cache_path}: {exc}")
elif (
isinstance(cached_version, str)
and cached_version == installed_version
and isinstance(cached_blob.get("entries"), list)
and cached_blob["entries"]
):
# Version + entries OK: verify SHA256 inline (mirrors
# _read_component_index's integrity check) before promoting.
unsigned = {k: v for k, v in cached_blob.items() if k != "sha256"}
sha = cached_blob.get("sha256")
calc = hashlib.sha256(orjson.dumps(unsigned, option=orjson.OPT_SORT_KEYS)).hexdigest()
if not isinstance(sha, str) or not sha:
logger.warning(f"Component cache at {cache_path} missing SHA256 hash; will rebuild.")
elif sha != calc:
logger.warning(
f"Component cache at {cache_path} failed SHA256 integrity check; will rebuild."
)
else:
_pending_cache_hit = cached_blob

async with component_cache.lock:
# Double-check: another task may have populated the cache while we
# awaited the lock.
if component_cache.all_types_dict is None:
if _pending_cache_hit is not None:
# Cache-hit short-circuit: reconstruct the flat dict from
# entries (same pattern as _load_from_index_or_cache).
# No telemetry is emitted because no build work happened;
# the rebuild branch below owns telemetry via
# import_langflow_components.
merged: dict[str, Any] = {}
for top_level, components in _pending_cache_hit["entries"]:
if top_level not in merged:
merged[top_level] = {}
merged[top_level].update(components)
merged = filter_disabled_components_from_dict(merged)
component_cache.all_types_dict = merged
component_count = sum(len(comps) for comps in component_cache.all_types_dict.values())
await logger.adebug(
f"Loaded {component_count} components from user cache (version={installed_version})"
)
# Populate hash lookups now so ensure_component_hash_lookups_loaded
# doesn't trigger a second rebuild on the next call.
_build_code_hash_lookups(component_cache)
else:
await logger.adebug("Building components cache")
langflow_components = await import_langflow_components(settings_service, telemetry_service)
custom_components_dict = await _determine_loading_strategy(settings_service)

# Flatten custom dict if it has a "components" wrapper
custom_flat = custom_components_dict.get("components", custom_components_dict) or {}

# Merge built-in and custom components (no wrapper at cache level)
component_cache.all_types_dict = {
**langflow_components["components"],
**custom_flat,
}
component_count = sum(len(comps) for comps in component_cache.all_types_dict.values())
await logger.adebug(f"Loaded {component_count} components")

# Precompute code hash lookups for fast flow validation
_build_code_hash_lookups(component_cache)

return component_cache.all_types_dict

Expand Down
Loading
Loading