Skip to content

Commit 8921cd5

Browse files
committed
Merge cold-start/02-component-index
2 parents abe6b7a + d71394f commit 8921cd5

2 files changed

Lines changed: 1380 additions & 56 deletions

File tree

src/lfx/src/lfx/interface/components.py

Lines changed: 200 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
MIN_MODULE_PARTS_WITH_FILENAME = 4 # Minimum parts needed to have a module filename (lfx.components.type.filename)
2828
EXPECTED_RESULT_LENGTH = 2 # Expected length of the tuple returned by _process_single_module
2929

30+
# Cap concurrent module scans so asyncio's default thread pool isn't exhausted.
31+
# Without this bound, high-concurrency loads can silently drop components when
32+
# threads are starved.
33+
_MODULE_SCAN_CONCURRENCY = 16
34+
3035

3136
# Create a class to manage component cache instead of using globals
3237
class ComponentCache:
@@ -42,6 +47,22 @@ def __init__(self):
4247
# None means "not yet loaded" (fail-closed); {} means "loaded, no components found".
4348
self.type_to_current_hash: dict[str, set[str]] | None = None
4449
self.all_known_hashes: set[str] | None = None
50+
# Lazily created on first access from inside a running event loop.
51+
# Constructing asyncio.Lock() at import time raises RuntimeError on
52+
# Python 3.13+ because the singleton below runs at module import.
53+
self._lock: asyncio.Lock | None = None
54+
55+
@property
56+
def lock(self) -> asyncio.Lock:
57+
"""Return the asyncio.Lock, creating it lazily on first access.
58+
59+
Must be called from inside a running event loop: asyncio.Lock() requires
60+
a running loop on Python 3.13+, and the singleton is instantiated at
61+
module import time.
62+
"""
63+
if self._lock is None:
64+
self._lock = asyncio.Lock()
65+
return self._lock
4566

4667

4768
# Singleton instance
@@ -84,7 +105,7 @@ def _parse_dev_mode() -> tuple[bool, list[str] | None]:
84105
return (False, None)
85106

86107

87-
def _read_component_index(custom_path: str | None = None) -> dict | None:
108+
async def _read_component_index(custom_path: str | None = None) -> dict | None:
88109
"""Read and validate the prebuilt component index.
89110
90111
Args:
@@ -104,7 +125,10 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
104125
import httpx
105126

106127
try:
107-
response = httpx.get(custom_path, timeout=10.0)
128+
# Sync HTTP call: only reached when the user explicitly points
129+
# components_index_path at an http(s) URL. The common (built-in
130+
# and file-path) read paths are async.
131+
response = httpx.get(custom_path, timeout=10.0) # noqa: ASYNC210
108132
response.raise_for_status()
109133
blob = orjson.loads(response.content)
110134
except httpx.HTTPError as e:
@@ -120,7 +144,7 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
120144
logger.warning(f"Custom component index not found at {custom_path}")
121145
return None
122146
try:
123-
blob = orjson.loads(index_path.read_bytes())
147+
blob = orjson.loads(await asyncio.to_thread(index_path.read_bytes))
124148
except orjson.JSONDecodeError as e:
125149
logger.warning(f"Component index at {custom_path} is corrupted or invalid JSON: {e}")
126150
return None
@@ -133,7 +157,8 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
133157
return None
134158

135159
try:
136-
blob = orjson.loads(index_path.read_bytes())
160+
# Read the built-in index off-thread to avoid blocking the event loop.
161+
blob = orjson.loads(await asyncio.to_thread(index_path.read_bytes))
137162
except orjson.JSONDecodeError as e:
138163
logger.warning(f"Built-in component index is corrupted or invalid JSON: {e}")
139164
return None
@@ -170,7 +195,7 @@ def _read_component_index(custom_path: str | None = None) -> dict | None:
170195
)
171196
return None
172197
except Exception as e: # noqa: BLE001
173-
logger.warning(f"Unexpected error reading component index: {type(e).__name__}: {e}")
198+
logger.warning(f"Unexpected error reading component index: {type(e).__name__}: {e}", exc_info=True)
174199
return None
175200
return blob
176201

@@ -187,6 +212,11 @@ def _get_cache_path() -> Path:
187212
def _save_generated_index(modules_dict: dict) -> None:
188213
"""Save a dynamically generated component index to cache for future use.
189214
215+
The cache is stamped with the lfx package version (not langflow) so lfx-only
216+
deployments don't invalidate the cache on every restart. The write is atomic:
217+
a temp file in the same directory is renamed via os.replace so concurrent
218+
workers never see a torn file.
219+
190220
Args:
191221
modules_dict: Dictionary of components by category
192222
"""
@@ -200,14 +230,24 @@ def _save_generated_index(modules_dict: dict) -> None:
200230
num_modules = len(modules_dict)
201231
num_components = sum(len(components) for components in modules_dict.values())
202232

203-
# Get version
204-
from importlib.metadata import version
233+
# Stamp with the lfx version (not langflow) so lfx-only deployments
234+
# don't invalidate the cache on every restart. Mirrors the read-time
235+
# PackageNotFoundError fallback above so editable/workspace installs
236+
# without dist-info don't crash the save path.
237+
from importlib.metadata import PackageNotFoundError, version
205238

206-
langflow_version = version("langflow")
239+
try:
240+
lfx_version = version("lfx")
241+
except PackageNotFoundError:
242+
logger.debug(
243+
"Could not determine installed lfx version (no package metadata); "
244+
"stamping generated index with 'unknown'"
245+
)
246+
lfx_version = "unknown"
207247

208248
# Build index structure
209249
index = {
210-
"version": langflow_version,
250+
"version": lfx_version,
211251
"metadata": {
212252
"num_modules": num_modules,
213253
"num_components": num_components,
@@ -219,13 +259,24 @@ def _save_generated_index(modules_dict: dict) -> None:
219259
payload = orjson.dumps(index, option=orjson.OPT_SORT_KEYS)
220260
index["sha256"] = hashlib.sha256(payload).hexdigest()
221261

222-
# Write to cache
262+
# Atomic write: temp file in the SAME directory as the target, then
263+
# rename via Path.replace (atomic on POSIX and Windows since Python 3.3).
264+
# The temp file must share a filesystem with the target to avoid
265+
# cross-device-link errors in containers where $TMPDIR is tmpfs and the
266+
# cache dir is a persistent volume.
223267
json_bytes = orjson.dumps(index, option=orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2)
224-
cache_path.write_bytes(json_bytes)
268+
tmp_path = cache_path.with_suffix(cache_path.suffix + ".tmp")
269+
tmp_path.write_bytes(json_bytes)
270+
tmp_path.replace(cache_path)
225271

226272
logger.debug(f"Saved generated component index to cache: {cache_path}")
273+
except OSError as e:
274+
# Disk full, permission denied, read-only mount, cross-device link, etc.
275+
# Surface at warning so operators see it: a silent failure here defeats
276+
# the cache and forces a full rebuild on every cold start.
277+
logger.warning(f"Failed to save generated index to cache: {type(e).__name__}: {e}")
227278
except Exception as e: # noqa: BLE001
228-
logger.debug(f"Failed to save generated index to cache: {e}")
279+
logger.error(f"Unexpected error saving generated index to cache: {type(e).__name__}: {e}", exc_info=True)
229280

230281

231282
async def _send_telemetry(
@@ -293,7 +344,7 @@ async def _load_from_index_or_cache(
293344
custom_index_path = settings_service.settings.components_index_path
294345
await logger.adebug(f"Using custom component index: {custom_index_path}")
295346

296-
index = _read_component_index(custom_index_path)
347+
index = await _read_component_index(custom_index_path)
297348
if index and "entries" in index:
298349
source = custom_index_path or "built-in index"
299350
await logger.adebug(f"Loading components from {source}")
@@ -316,7 +367,7 @@ async def _load_from_index_or_cache(
316367
else:
317368
if cache_path.exists():
318369
await logger.adebug(f"Attempting to load from cache: {cache_path}")
319-
index = _read_component_index(str(cache_path))
370+
index = await _read_component_index(str(cache_path))
320371
if index and "entries" in index:
321372
await logger.adebug("Loading components from cached index")
322373
for top_level, components in index["entries"]:
@@ -380,8 +431,16 @@ async def _load_components_dynamically(
380431
if not module_names:
381432
return modules_dict
382433

383-
# Create tasks for parallel module processing
384-
tasks = [asyncio.to_thread(_process_single_module, modname) for modname in module_names]
434+
# Bound concurrent scans with a semaphore so the default thread pool isn't
435+
# exhausted under high module-count workloads. Each asyncio.to_thread call
436+
# is wrapped in a helper that acquires the semaphore before running.
437+
semaphore = asyncio.Semaphore(_MODULE_SCAN_CONCURRENCY)
438+
439+
async def _bounded(modname: str):
440+
async with semaphore:
441+
return await asyncio.to_thread(_process_single_module, modname)
442+
443+
tasks = [_bounded(modname) for modname in module_names]
385444

386445
# Wait for all modules to be processed
387446
try:
@@ -390,10 +449,15 @@ async def _load_components_dynamically(
390449
await logger.aerror(f"Error during parallel module processing: {e}", exc_info=True)
391450
return modules_dict
392451

393-
# Merge results from all modules
452+
# Merge results from all modules; track failures so we can emit one
453+
# aggregate signal — without this the user sees N individual warnings
454+
# interleaved with normal startup logs and no overall pass/fail.
455+
failure_types: dict[str, int] = {}
394456
for result in module_results:
395457
if isinstance(result, Exception):
396-
await logger.awarning(f"Module processing failed: {result}")
458+
type_name = type(result).__name__
459+
failure_types[type_name] = failure_types.get(type_name, 0) + 1
460+
await logger.awarning(f"Module processing failed ({type_name}): {result}")
397461
continue
398462

399463
if result and isinstance(result, tuple) and len(result) == EXPECTED_RESULT_LENGTH:
@@ -403,6 +467,13 @@ async def _load_components_dynamically(
403467
modules_dict[top_level] = {}
404468
modules_dict[top_level].update(components)
405469

470+
if failure_types:
471+
total_failed = sum(failure_types.values())
472+
await logger.aerror(
473+
f"Component module load: {total_failed} of {len(module_results)} modules failed. "
474+
f"Failure types: {failure_types}"
475+
)
476+
406477
return modules_dict
407478

408479

@@ -442,6 +513,15 @@ async def _load_selective_dev_mode(
442513
modules_dict[top_level] = {}
443514
modules_dict[top_level].update(components)
444515

516+
if not modules_dict:
517+
# No index/cache + no targeted module loaded successfully. Without this
518+
# warning the caller sees a successful empty result, which is the worst
519+
# debug-time UX -- the component palette is empty with no signal why.
520+
await logger.awarning(
521+
f"LFX_DEV selective mode produced 0 components for targets {target_modules}. "
522+
"Check that the targeted module names exist under lfx.components and imported cleanly."
523+
)
524+
445525
await logger.adebug(f"Reloaded {len(target_modules)} module(s), kept others from index")
446526
return modules_dict, "dynamic"
447527

@@ -643,24 +723,111 @@ async def get_and_cache_all_types_dict(
643723
telemetry_service: Optional telemetry service for tracking component loading metrics
644724
"""
645725
if component_cache.all_types_dict is None:
646-
await logger.adebug("Building components cache")
647-
648-
langflow_components = await import_langflow_components(settings_service, telemetry_service)
649-
custom_components_dict = await _determine_loading_strategy(settings_service)
726+
# The peek runs OUTSIDE the lock so a multi-MB disk read doesn't widen
727+
# the lock-hold window. It is idempotent: when two callers cold-start at
728+
# once they may each emit the same warning, which is cosmetic.
729+
from importlib.metadata import PackageNotFoundError as _PackageNotFoundError
730+
from importlib.metadata import version as _version
650731

651-
# Flatten custom dict if it has a "components" wrapper
652-
custom_flat = custom_components_dict.get("components", custom_components_dict) or {}
732+
try:
733+
installed_version = _version("lfx")
734+
except _PackageNotFoundError:
735+
installed_version = None
653736

654-
# Merge built-in and custom components (no wrapper at cache level)
655-
component_cache.all_types_dict = {
656-
**langflow_components["components"],
657-
**custom_flat,
658-
}
659-
component_count = sum(len(comps) for comps in component_cache.all_types_dict.values())
660-
await logger.adebug(f"Loaded {component_count} components")
737+
# Promoted to the SHA-validated cache blob when version matches and
738+
# integrity passes; used to short-circuit the rebuild inside the lock.
739+
_pending_cache_hit: dict | None = None
661740

662-
# Precompute code hash lookups for fast flow validation
663-
_build_code_hash_lookups(component_cache)
741+
if installed_version is not None:
742+
try:
743+
cache_path = _get_cache_path()
744+
except Exception as exc: # noqa: BLE001
745+
logger.debug(f"Could not resolve component cache path: {exc}")
746+
cache_path = None
747+
if cache_path is not None and cache_path.exists():
748+
cached_blob: Any = None
749+
try:
750+
cached_blob = orjson.loads(await asyncio.to_thread(cache_path.read_bytes))
751+
except (OSError, orjson.JSONDecodeError) as exc:
752+
logger.warning(
753+
f"Component cache peek failed at {cache_path} "
754+
f"({type(exc).__name__}); falling through to rebuild: {exc}"
755+
)
756+
757+
if isinstance(cached_blob, dict):
758+
cached_version = cached_blob.get("version")
759+
if isinstance(cached_version, str) and cached_version and cached_version != installed_version:
760+
logger.warning(
761+
f"Stale component cache: cached={cached_version}, "
762+
f"installed={installed_version}, path={cache_path}. "
763+
"Removing so it is regenerated on the next build."
764+
)
765+
try:
766+
cache_path.unlink(missing_ok=True)
767+
except OSError as exc:
768+
logger.warning(f"Could not remove stale component cache at {cache_path}: {exc}")
769+
elif (
770+
isinstance(cached_version, str)
771+
and cached_version == installed_version
772+
and isinstance(cached_blob.get("entries"), list)
773+
and cached_blob["entries"]
774+
):
775+
# Version + entries OK: verify SHA256 inline (mirrors
776+
# _read_component_index's integrity check) before promoting.
777+
unsigned = {k: v for k, v in cached_blob.items() if k != "sha256"}
778+
sha = cached_blob.get("sha256")
779+
calc = hashlib.sha256(orjson.dumps(unsigned, option=orjson.OPT_SORT_KEYS)).hexdigest()
780+
if not isinstance(sha, str) or not sha:
781+
logger.warning(f"Component cache at {cache_path} missing SHA256 hash; will rebuild.")
782+
elif sha != calc:
783+
logger.warning(
784+
f"Component cache at {cache_path} failed SHA256 integrity check; will rebuild."
785+
)
786+
else:
787+
_pending_cache_hit = cached_blob
788+
789+
async with component_cache.lock:
790+
# Double-check: another task may have populated the cache while we
791+
# awaited the lock.
792+
if component_cache.all_types_dict is None:
793+
if _pending_cache_hit is not None:
794+
# Cache-hit short-circuit: reconstruct the flat dict from
795+
# entries (same pattern as _load_from_index_or_cache).
796+
# No telemetry is emitted because no build work happened;
797+
# the rebuild branch below owns telemetry via
798+
# import_langflow_components.
799+
merged: dict[str, Any] = {}
800+
for top_level, components in _pending_cache_hit["entries"]:
801+
if top_level not in merged:
802+
merged[top_level] = {}
803+
merged[top_level].update(components)
804+
merged = filter_disabled_components_from_dict(merged)
805+
component_cache.all_types_dict = merged
806+
component_count = sum(len(comps) for comps in component_cache.all_types_dict.values())
807+
await logger.adebug(
808+
f"Loaded {component_count} components from user cache (version={installed_version})"
809+
)
810+
# Populate hash lookups now so ensure_component_hash_lookups_loaded
811+
# doesn't trigger a second rebuild on the next call.
812+
_build_code_hash_lookups(component_cache)
813+
else:
814+
await logger.adebug("Building components cache")
815+
langflow_components = await import_langflow_components(settings_service, telemetry_service)
816+
custom_components_dict = await _determine_loading_strategy(settings_service)
817+
818+
# Flatten custom dict if it has a "components" wrapper
819+
custom_flat = custom_components_dict.get("components", custom_components_dict) or {}
820+
821+
# Merge built-in and custom components (no wrapper at cache level)
822+
component_cache.all_types_dict = {
823+
**langflow_components["components"],
824+
**custom_flat,
825+
}
826+
component_count = sum(len(comps) for comps in component_cache.all_types_dict.values())
827+
await logger.adebug(f"Loaded {component_count} components")
828+
829+
# Precompute code hash lookups for fast flow validation
830+
_build_code_hash_lookups(component_cache)
664831

665832
return component_cache.all_types_dict
666833

0 commit comments

Comments
 (0)