Skip to content

Conversation

@dkaushik94
Copy link
Collaborator

@dkaushik94 dkaushik94 commented Jan 16, 2026

Changes:

  • Added background queueing of workflow for Async execution options
  • Modified queueing service to choose between asyncIO vs Celery (supporting these as of now)
  • Get status endpoint for job status polling.
  • TODO: Create consuming streams logic and verify Celery is also supported full cycle as is AsyncIO queues.

Summary by CodeRabbit

Release Notes

  • New Features

    • Workflow execution now supports synchronous and background execution modes
    • Added timeout protection for workflow execution
    • Introduced workflow status tracking endpoint to monitor running and completed jobs
    • Added ability to stop running workflows
    • Enhanced error handling with structured error responses
  • Documentation

    • Workflow input format updated to use flat, component-prefixed keys (e.g., "ComponentID.field")
    • Job response now includes auto-generated timestamp information

✏️ Tip: You can customize this high-level summary in your review settings.

Janardan S Kavia and others added 27 commits January 2, 2026 12:29
- Add workflow API endpoints (POST /workflow, GET /workflow, POST /workflow/stop)
- Implement developer API protection with settings check
- Add comprehensive workflow schema models with proper validation
- Create extensive unit test suite covering all scenarios
- Apply Ruff linting standards and fix all code quality issues
- Support API key authentication for all workflow endpoints
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
…s, Asycn execution status updates endpoint.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 16, 2026

Walkthrough

This PR implements a complete V2 Workflow API with converters for translating V2 request/response schemas to V1 execution results, adds synchronous and background execution handlers with timeout protection, introduces workflow-specific exceptions, updates job queue and task services to support status tracking with dictionary-based records, and includes comprehensive test coverage.

Changes

Cohort / File(s) Summary
V2 API Converters
src/backend/base/langflow/api/v2/converters.py
New module with four primary conversion functions: parse_flat_inputs transforms flat component.param keys into nested tweaks structure; run_response_to_workflow_response converts V1 RunResponse to V2 WorkflowExecutionResponse with terminal node resolution, duplicate display name handling, and per-vertex ComponentOutput assembly; create_job_response and create_error_response build standardized job and error responses. Includes private helpers for nested extraction, text/model/file path extraction, and metadata building.
V2 Workflow Endpoints
src/backend/base/langflow/api/v2/workflow.py
Major refactor replacing placeholder with full implementation: introduced developer API guard via router-level dependency; added three execution paths: sync with configurable timeout wrapping, background task submission, and streaming placeholder (501); implemented execute_sync_workflow_with_timeout, execute_sync_workflow, execute_workflow_background, main orchestrator execute_workflow, plus get_workflow_status and stop_workflow endpoints; integrates graph construction, execution via run_graph_internal, response conversion, and structured error handling.
Exception Types
src/backend/base/langflow/exceptions/api.py
Added workflow-specific exception hierarchy: WorkflowExecutionError (base), WorkflowTimeoutError (timeout signaling), and WorkflowValidationError (validation issues).
Job Queue Service
src/backend/base/langflow/services/job_queue/service.py
Refactored per-job data structure from tuple-based to dictionary-based records tracking status, result, error, cleanup_time, and task reference; updated start_job with task_wrapper handling status transitions and result storage; reworked get_queue_data signature to return dict; updated cleanup logic to work with dict-based entries; integrated JobStatus enum.
Task Service & Backends
src/backend/base/langflow/services/task/backends/celery.py, src/backend/base/langflow/services/task/service.py, src/backend/base/langflow/services/task/factory.py
Moved AsyncResult import behind TYPE_CHECKING with runtime imports in methods; added get_task_status method mapping Celery states to JobStatus; introduced fire_and_forget_task and async get_task_status in TaskService for Celery or JobQueueService routing; updated factory to inject SettingsService and conditionally instantiate CeleryBackend based on settings.
Schema Changes
src/lfx/src/lfx/schema/workflow.py, src/backend/base/langflow/processing/process.py, src/lfx/src/lfx/processing/process.py
Removed GlobalInputs model, reworked WorkflowExecutionRequest to use flat component-prefixed keys; updated WorkflowJobResponse.created_timestamp to use default_factory with datetime.now().isoformat(); added debug print statements for effective_session_id and run_outputs.
Converter Tests
src/backend/tests/unit/api/v2/test_converters.py
Comprehensive test module covering parse_flat_inputs, nested value extraction, text/model/file extraction, metadata building, and end-to-end RunResponse-to-WorkflowExecutionResponse conversion across various graph topologies, duplicate names, missing fields, and edge cases.
Workflow API Tests
src/backend/tests/unit/api/v2/test_workflow.py
Extensive test suite replacing minimal tests: added TestWorkflowDeveloperAPIProtection, TestWorkflowErrorHandling, TestWorkflowSyncExecution classes; validates developer API guard (403 on disabled), error propagation (validation, database, timeout), sync/background/streaming behavior, response structure consistency, and multiple terminal node scenarios.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as Sync Executor
    participant Parser as Input Parser
    participant Graph as Graph Engine
    participant Converter as Response Converter

    Client->>API: WorkflowExecutionRequest
    API->>Parser: parse_flat_inputs()
    Parser-->>API: tweaks + session_id
    API->>Graph: run_graph_internal(graph, tweaks, session_id)
    Graph-->>API: RunResponse
    API->>Converter: run_response_to_workflow_response()
    Converter->>Converter: identify terminals
    Converter->>Converter: resolve outputs per node type
    Converter->>Converter: handle duplicate display names
    Converter->>Converter: build ComponentOutput metadata
    Converter-->>API: WorkflowExecutionResponse
    API-->>Client: response (200 or 500 with error)
Loading
sequenceDiagram
    participant Client
    participant API as Background Executor
    participant TaskService
    participant JobQueue
    participant BackgroundTask

    Client->>API: WorkflowExecutionRequest
    API->>TaskService: fire_and_forget_task(execute_task_func)
    TaskService->>JobQueue: start_job(task_id, execute_task_func)
    TaskService-->>API: task_id
    API-->>Client: WorkflowJobResponse (job_id, status=QUEUED)
    JobQueue->>BackgroundTask: execute_task_func()
    BackgroundTask->>BackgroundTask: parse inputs, build graph, execute
    BackgroundTask-->>JobQueue: result/error
    JobQueue->>JobQueue: store result, update status (COMPLETED/FAILED)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes


Important

Pre-merge checks failed

Please resolve all errors before merging. Addressing warnings is optional.

❌ Failed checks (1 error, 2 warnings)
Check name Status Explanation Resolution
Test Coverage For New Implementations ❌ Error Test suite has integrity issues: test_run_response_preserves_inputs lacks function call and assertions, orphaned code exists in test_workflow.py, and refactored service modules lack unit test coverage. Add function calls and assertions to incomplete test, move orphaned code into proper test method, create unit tests for service modules covering new functionality.
Test Quality And Coverage ⚠️ Warning Pull request test coverage has structural failures: incomplete test at line 998, orphaned code at lines 662-666, and missing imports cause runtime NameErrors. Complete test_run_response_preserves_inputs with actual function call and assertion, move orphaned code into proper test method, add missing imports to main code files, and run tests locally.
Test File Naming And Structure ⚠️ Warning Two critical structural violations found: incomplete test in test_converters.py (lines 998-1009) missing function call and assertions; orphaned test code in test_workflow.py (lines 662-666) outside test method. Complete test_run_response_preserves_inputs with missing function call and assertions; move orphaned code into test_all_endpoints_require_api_key_authentication method.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Feat: Background Queuing of Workflows' accurately and clearly summarizes the main change in the PR, which adds background queueing and async execution for workflows.
Docstring Coverage ✅ Passed Docstring coverage is 91.03% which is sufficient. The required threshold is 80.00%.
Excessive Mock Usage Warning ✅ Passed Test files demonstrate appropriate mock usage with unit tests having reasonable mock-per-test ratios and integration tests using real clients alongside mocks for external dependencies only.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch async-job-queue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@dkaushik94 dkaushik94 marked this pull request as draft January 16, 2026 19:00
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
src/backend/tests/unit/api/v2/test_workflow.py (1)

1097-1103: Critical: Second orphaned test code block.

Same issue as lines 662-666. This code block is at class indentation level inside TestWorkflowSyncExecution but outside any method. It will not execute as a test and will cause import-time errors.

This should be consolidated with the other API key authentication tests mentioned above.

src/backend/base/langflow/services/job_queue/service.py (2)

71-71: Type annotation inconsistent with actual usage.

The type annotation declares tuples but the code uses dict-style access throughout. Update to match the intended dict structure.

-    self._queues: dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]] = {}
+    self._queues: dict[str, dict[str, Any]] = {}

Don't forget to add from typing import Any if not already imported.


146-152: create_queue must be updated to create dict entries.

This method creates tuple entries but all other methods expect dict entries. This is the source of the type mismatch.

🐛 Proposed fix
     main_queue: asyncio.Queue = asyncio.Queue()
     event_manager: EventManager = self._create_default_event_manager(main_queue)

-    # Register the queue without an active task.
-    self._queues[job_id] = (main_queue, event_manager, None, None)
+    # Register the queue with initial state.
+    self._queues[job_id] = {
+        "queue": main_queue,
+        "event_manager": event_manager,
+        "task": None,
+        "cleanup_time": None,
+        "status": JobStatus.QUEUED,
+        "result": None,
+        "error": None,
+    }
     logger.debug(f"Queue and event manager successfully created for job_id {job_id}")
     return main_queue, event_manager
src/backend/base/langflow/api/v2/workflow.py (1)

31-40: Duplicate imports from lfx.schema.workflow.

The same module is imported twice with overlapping symbols at lines 31-40 and 56-66. This should be consolidated.

♻️ Proposed fix: Consolidate imports
-from lfx.schema.workflow import (
-    WORKFLOW_EXECUTION_RESPONSES,
-    WORKFLOW_STATUS_RESPONSES,
-    WorkflowExecutionRequest,
-    WorkflowExecutionResponse,
-    WorkflowJobResponse,
-    WorkflowStopRequest,
-    WorkflowStopResponse,
-    JobStatus,
-)
-from lfx.services.deps import get_settings_service
-
-from langflow.api.v1.schemas import RunResponse
-from langflow.api.v2.converters import (
-    create_error_response,
-    parse_flat_inputs,
-    run_response_to_workflow_response,
-)
-from lfx.log.logger import logger
-from langflow.services.deps import get_task_service, get_queue_service
-from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
-from langflow.processing.process import process_tweaks, run_graph_internal
-from langflow.services.auth.utils import api_key_security
-from langflow.services.database.models.flow.model import FlowRead
-from langflow.services.database.models.user.model import UserRead
-from lfx.schema.workflow import (
-    WORKFLOW_EXECUTION_RESPONSES,
-    WORKFLOW_STATUS_RESPONSES,
-    WorkflowExecutionRequest,
-    WorkflowExecutionResponse,
-    WorkflowJobResponse,
-    WorkflowStopRequest,
-    WorkflowStopResponse,
-    JobStatus,
-    ErrorDetail
-)
+from lfx.schema.workflow import (
+    WORKFLOW_EXECUTION_RESPONSES,
+    WORKFLOW_STATUS_RESPONSES,
+    WorkflowExecutionRequest,
+    WorkflowExecutionResponse,
+    WorkflowJobResponse,
+    WorkflowStopRequest,
+    WorkflowStopResponse,
+    JobStatus,
+    ErrorDetail,
+)
+from lfx.services.deps import get_settings_service
+from lfx.log.logger import logger
+
+from langflow.api.v1.schemas import RunResponse
+from langflow.api.v2.converters import (
+    create_error_response,
+    parse_flat_inputs,
+    run_response_to_workflow_response,
+)
+from langflow.exceptions.api import WorkflowTimeoutError, WorkflowValidationError
+from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
+from langflow.processing.process import process_tweaks, run_graph_internal
+from langflow.services.auth.utils import api_key_security
+from langflow.services.database.models.flow.model import Flow, FlowRead
+from langflow.services.database.models.user.model import UserRead
+from langflow.services.deps import get_task_service, get_queue_service

Also applies to: 56-66

🤖 Fix all issues with AI agents
In `@src/backend/base/langflow/api/v2/converters.py`:
- Around line 215-239: _get_raw_content returns None if a dict contains the
"results" key with a None value and never checks "content"; update the dict
branch in _get_raw_content to only return vertex_output_data["results"] when the
key exists and its value is not None (e.g., check "results" in
vertex_output_data and vertex_output_data["results"] is not None) and otherwise
fall back to return vertex_output_data["content"] if that key exists and is not
None; keep the same behavior for the attribute checks above.
- Around line 43-99: parse_flat_inputs currently leaves session_id inside tweaks
for the backward-compat branch (the elif isinstance(value, dict) case), so
session_id is never returned; update that branch to extract "session_id" from
the value dict (set session_id if not already set) and then store the remaining
params into tweaks[key] (or omit session_id from the stored dict) so that
parse_flat_inputs returns the session_id consistently; reference
parse_flat_inputs, tweaks, session_id, and the elif isinstance(value, dict)
branch to locate the fix.
- Around line 496-523: The error response builder accepts job_id: str | None but
passes it into WorkflowExecutionResponse.job_id which is required str, causing
Pydantic validation failures; modify create_error_response so it ensures a
non-None job_id before constructing WorkflowExecutionResponse (e.g., if job_id
is None, set job_id = workflow_request.request_id if available or fallback to
str(uuid.uuid4())), update the function signature/comment accordingly, and
import uuid if you use uuid4; ensure callers (e.g., the exception handler in
workflow.py) can still pass None safely.

In `@src/backend/base/langflow/api/v2/workflow.py`:
- Around line 131-134: The file raises WorkflowTimeoutError in the except block
around async execution but doesn't import it; add an import statement for
WorkflowTimeoutError (and also WorkflowValidationError as suggested) from
langflow.exceptions.api so the symbols WorkflowTimeoutError and
WorkflowValidationError are defined for use in this module (update the
top-of-file imports to include these names).
- Around line 233-237: The function execute_workflow_background uses the Flow
type but Flow is not imported; update the signature/imports to be consistent:
either import Flow from langflow.services.database.models.flow.model and keep
the parameter typed as Flow, or change the parameter type to FlowRead to match
execute_sync_workflow; adjust the import list at the top of the file accordingly
so Flow (or FlowRead) is properly imported and the type annotation on
execute_workflow_background matches the chosen import.
- Around line 330-332: Replace the stray print(e) in the except block with the
project's logger: remove print(e) and call logger.exception(...) or
logger.aerror("Failed to queue workflow task in background mode", exc_info=True)
so the error and stack trace are recorded consistently; update the same except
block that currently has the commented logger.aerror to use the logger API
instead of print.
- Around line 304-308: The flow-not-found branch currently raises HTTPException
with a plain string detail; change it to raise HTTPException with a structured
dict matching the other errors (keys: "error", "code", "message") so tests
expecting detail["code"] == "FLOW_NOT_FOUND" pass—update the block that checks
the result of get_flow_by_id_or_endpoint_name (referenced as flow and
workflow_request.flow_id) to raise
HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"error": "Flow not
found", "code": "FLOW_NOT_FOUND", "message": f"Flow identifier
{workflow_request.flow_id} not found"}) to mirror check_developer_api_enabled's
response shape.
- Around line 177-180: The code raises WorkflowValidationError in the flow
validation block (e.g., the checks that raise WorkflowValidationError when
flow.data is None and later at line ~190), but that exception is not imported;
add an import for WorkflowValidationError at the top of this module from the
module where that exception class is defined (locate its definition in your
exceptions/errors module) so the raises in this file (references:
WorkflowValidationError, flow) resolve without a NameError.
- Around line 361-365: The code calls .upper() on the JobStatus enum member
(status_val) which will raise AttributeError; in the block handling
JobStatus.FAILED/JobStatus.ERROR (inside the if status_val in [...] branch where
result = await task_service.get_task_result(job_id) and ErrorDetail is
constructed), replace status_val.upper() with a string form such as
status_val.name.upper() (or status_val.value.upper() if you prefer the enum
value) so the ErrorDetail.code receives a proper uppercase string.
- Line 95: Tests are using the singular path but the router is created as router
= APIRouter(prefix="/workflows", ...) so update all test request paths from
api/v2/workflow... to api/v2/workflows... (e.g., change api/v2/workflow ->
api/v2/workflows and api/v2/workflow/stop -> api/v2/workflows/stop) so they
match the APIRouter prefix and the endpoints defined by the Workflow router.

In `@src/backend/base/langflow/processing/process.py`:
- Line 52: Remove the leftover debug print of the session id: delete the
print("Effective session ID: ", effective_session_id) or replace it with a
structured log call using the module logger (e.g., logger.debug or logger.info)
so it respects log levels and aggregation; locate the print referencing
effective_session_id in process.py (near where logger is imported and where
logger.awarning is used) and update accordingly.

In `@src/backend/base/langflow/services/job_queue/service.py`:
- Around line 177-205: The runtime TypeError occurs because _queues stores
tuples but the code (in functions like the block creating task_wrapper and using
job_entry["task"], job_entry["status"], etc.) expects dict access; update
_queues and create_queue to use dict-based entries instead of tuples: change the
data structure returned by create_queue (and any type annotations) so each entry
is a dict with keys like "main_queue", "event_manager", "task", "status",
"result", "cleanup_time", and "error", update the docstring to describe the new
dict schema, and ensure all places that read/write _queues entries (e.g., where
job_entry is accessed in task_wrapper and elsewhere) use these string keys
consistently so no tuple indexing occurs.

In `@src/backend/base/langflow/services/task/backends/celery.py`:
- Around line 37-47: The get_task_status function uses JobStatus but the symbol
isn't imported and the annotated return type is wrong; add an import for
JobStatus from its defining module (the same module other status uses come from)
and change the function signature of get_task_status(task_id: str) -> str to
return the JobStatus type (e.g., -> JobStatus); ensure state_map values remain
JobStatus members and keep get_task(task_id) usage as-is.

In `@src/backend/base/langflow/services/task/service.py`:
- Around line 80-91: get_task_status references JobStatus.ERROR in its exception
fallback but JobStatus is not imported; add the missing import for the JobStatus
enum at the top of the module so the fallback returns a valid JobStatus instead
of raising NameError. Locate the get_task_status function and ensure an import
like "from <appropriate_module> import JobStatus" is present (matching the
project's JobStatus definition) so JobStatus.ERROR resolves correctly.
- Around line 34-64: The fire_and_forget_task currently accepts Callable[...,
Any] but calls task_func(*args, **kwargs) directly for the non-Celery path,
which will break if task_func is synchronous because JobQueueService.start_job
expects an awaitable; update fire_and_forget_task to detect async vs sync
callables (use inspect.iscoroutinefunction(task_func) or inspect.isawaitable)
and for sync functions wrap execution via asyncio.to_thread(...) so start_job
always receives an awaitable; keep the Celery branch using
backend.launch_task(task_func, ...) untouched and reference the
get_queue_service(), JobQueueService.start_job, and the fire_and_forget_task
function when making the change.

In `@src/backend/tests/unit/api/v2/test_converters.py`:
- Around line 998-1009: The test test_run_response_preserves_inputs currently
constructs graph, run_response, inputs and a WorkflowExecutionRequest but never
exercises the converter; call run_response_to_workflow_response(request,
run_response, graph) (using the same WorkflowExecutionRequest instance) and
assert that the returned WorkflowResponse (or equivalent) preserves the inputs
(e.g., response.inputs == inputs). Ensure you reference the existing symbols
run_response_to_workflow_response, WorkflowExecutionRequest, run_response and
graph when adding the call and the assertion.

In `@src/backend/tests/unit/api/v2/test_workflow.py`:
- Around line 662-666: Orphaned test code in TestWorkflowErrorHandling is
executed at import because it's outside any method; move the GET /workflow
no-API-key assertions (the client.get call and the two asserts) into the
test_all_endpoints_require_api_key_authentication method of the
TestWorkflowDeveloperAPIProtection class (after the POST /workflow check) so
they run as part of that test; reference
test_all_endpoints_require_api_key_authentication,
TestWorkflowDeveloperAPIProtection, and the client fixture when relocating the
code.

In `@src/lfx/src/lfx/processing/process.py`:
- Line 77: Remove the leftover debug print statements that output sensitive data
(e.g., the line printing effective_session_id and the other at line 88); replace
them with the module logger (imported as logger) at an appropriate level such as
logger.debug() if the information should be retained for debugging, or simply
delete the calls if not needed—locate the prints referencing
effective_session_id and any run_outputs in process.py (e.g., the
print("Effective session ID: ", effective_session_id) and the one at line 88)
and update accordingly.
🧹 Nitpick comments (5)
src/backend/base/langflow/services/task/factory.py (1)

12-14: Add return type annotation for consistency with base class.

The base class ServiceFactory.create() specifies -> "Service" as return type. Adding the annotation improves type checking and documentation.

Suggested fix
     `@override`
-    def create(self, settings_service: SettingsService):
+    def create(self, settings_service: SettingsService) -> TaskService:
         return TaskService(settings_service)
src/lfx/src/lfx/schema/workflow.py (1)

103-105: Prefer timezone-aware UTC timestamps.
datetime.now().isoformat() yields a naive local timestamp, which can be ambiguous for clients. Consider UTC (or whatever format is the API contract).

♻️ Suggested update (UTC timestamps)
-from datetime import datetime
+from datetime import datetime, timezone
@@
-created_timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
+created_timestamp: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
src/backend/base/langflow/api/v2/converters.py (1)

479-493: Standardize created_timestamp format across responses.
Here it’s an epoch string, while schema defaults to ISO; mixed formats will surprise clients. Pick a single format and use it consistently (including run_response_to_workflow_response and create_error_response).

src/backend/tests/unit/api/v2/test_workflow.py (1)

309-318: Consider extracting shared fixture to conftest.py.

The mock_settings_dev_api_enabled fixture is duplicated in three test classes (lines 101-109, 309-318, and 671-680). Consider moving this to conftest.py or a shared base class to reduce duplication.

♻️ Suggested refactor

Create a shared fixture in conftest.py:

# In src/backend/tests/unit/api/v2/conftest.py
`@pytest.fixture`
def mock_settings_dev_api_enabled():
    """Mock settings with developer API enabled."""
    with patch("langflow.api.v2.workflow.get_settings_service") as mock_get_settings_service:
        mock_service = MagicMock()
        mock_settings = MagicMock()
        mock_settings.developer_api_enabled = True
        mock_service.settings = mock_settings
        mock_get_settings_service.return_value = mock_service
        yield mock_settings

`@pytest.fixture`
def mock_settings_dev_api_disabled():
    """Mock settings with developer API disabled."""
    with patch("langflow.api.v2.workflow.get_settings_service") as mock_get_settings_service:
        mock_service = MagicMock()
        mock_settings = MagicMock()
        mock_settings.developer_api_enabled = False
        mock_service.settings = mock_settings
        mock_get_settings_service.return_value = mock_service
        yield mock_settings
src/backend/tests/unit/api/v2/test_converters.py (1)

248-253: Test calls function but doesn't assert on result.

The function is called but the result is not captured or asserted. This test only verifies no exception is raised, not that the correct value is returned.

 def test_extract_text_from_output_value(self):
     """Test extracting from OutputValue structure."""
     # OutputValue structure from lfx.schema.schema
     data = {"message": {"text": "Hello World"}, "type": "message"}
-    _extract_nested_value(data, "message", "text")
+    result = _extract_nested_value(data, "message", "text")
+    assert result == "Hello World"
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0689810 and 84a9378.

⛔ Files ignored due to path filters (1)
  • src/frontend/package-lock.json is excluded by !**/package-lock.json
📒 Files selected for processing (13)
  • src/backend/base/langflow/api/v2/converters.py
  • src/backend/base/langflow/api/v2/workflow.py
  • src/backend/base/langflow/exceptions/api.py
  • src/backend/base/langflow/processing/process.py
  • src/backend/base/langflow/services/job_queue/service.py
  • src/backend/base/langflow/services/task/backends/celery.py
  • src/backend/base/langflow/services/task/factory.py
  • src/backend/base/langflow/services/task/service.py
  • src/backend/tests/unit/api/v2/test_converters.py
  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/lfx/src/lfx/_assets/component_index.json
  • src/lfx/src/lfx/processing/process.py
  • src/lfx/src/lfx/schema/workflow.py
🧰 Additional context used
📓 Path-based instructions (4)
src/backend/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)

src/backend/**/*.py: Use FastAPI async patterns with await for async operations in component execution methods
Use asyncio.create_task() for background tasks and implement proper cleanup with try/except for asyncio.CancelledError
Use queue.put_nowait() for non-blocking queue operations and asyncio.wait_for() with timeouts for controlled get operations

Files:

  • src/backend/base/langflow/processing/process.py
  • src/backend/base/langflow/services/task/factory.py
  • src/backend/base/langflow/services/task/service.py
  • src/backend/base/langflow/services/task/backends/celery.py
  • src/backend/base/langflow/api/v2/converters.py
  • src/backend/base/langflow/api/v2/workflow.py
  • src/backend/base/langflow/exceptions/api.py
  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
  • src/backend/base/langflow/services/job_queue/service.py
src/backend/base/langflow/api/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)

Backend API endpoints should be organized by version (v1/, v2/) under src/backend/base/langflow/api/ with specific modules for features (chat.py, flows.py, users.py, etc.)

Files:

  • src/backend/base/langflow/api/v2/converters.py
  • src/backend/base/langflow/api/v2/workflow.py
src/backend/tests/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/testing.mdc)

src/backend/tests/**/*.py: Place backend unit tests in src/backend/tests/ directory, component tests in src/backend/tests/unit/components/ organized by component subdirectory, and integration tests accessible via make integration_tests
Use same filename as component with appropriate test prefix/suffix (e.g., my_component.pytest_my_component.py)
Use the client fixture (FastAPI Test Client) defined in src/backend/tests/conftest.py for API tests; it provides an async httpx.AsyncClient with automatic in-memory SQLite database and mocked environment variables. Skip client creation by marking test with @pytest.mark.noclient
Inherit from the correct ComponentTestBase family class located in src/backend/tests/base.py based on API access needs: ComponentTestBase (no API), ComponentTestBaseWithClient (needs API), or ComponentTestBaseWithoutClient (pure logic). Provide three required fixtures: component_class, default_kwargs, and file_names_mapping
Create comprehensive unit tests for all new backend components. If unit tests are incomplete, create a corresponding Markdown file documenting manual testing steps and expected outcomes
Test both sync and async code paths, mock external dependencies appropriately, test error handling and edge cases, validate input/output behavior, and test component initialization and configuration
Use @pytest.mark.asyncio decorator for async component tests and ensure async methods are properly awaited
Test background tasks using asyncio.create_task() and verify completion with asyncio.wait_for() with appropriate timeout constraints
Test queue operations using non-blocking queue.put_nowait() and asyncio.wait_for(queue.get(), timeout=...) to verify queue processing without blocking
Use @pytest.mark.no_blockbuster marker to skip the blockbuster plugin in specific tests
For database tests that may fail in batch runs, run them sequentially using uv run pytest src/backend/tests/unit/test_database.py r...

Files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
**/test_*.py

📄 CodeRabbit inference engine (Custom checks)

**/test_*.py: Review test files for excessive use of mocks that may indicate poor test design - check if tests have too many mock objects that obscure what's actually being tested
Warn when mocks are used instead of testing real behavior and interactions, and suggest using real objects or test doubles when mocks become excessive
Ensure mocks are used appropriately for external dependencies only, not for core logic
Backend test files should follow the naming convention test_*.py with proper pytest structure
Test files should have descriptive test function names that explain what is being tested
Tests should be organized logically with proper setup and teardown
Consider including edge cases and error conditions for comprehensive test coverage
Verify tests cover both positive and negative scenarios where appropriate
For async functions in backend tests, ensure proper async testing patterns are used with pytest
For API endpoints, verify both success and error response testing

Files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
🧠 Learnings (16)
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to src/backend/**/*.py : Use `asyncio.create_task()` for background tasks and implement proper cleanup with try/except for `asyncio.CancelledError`

Applied to files:

  • src/backend/base/langflow/services/task/service.py
  • src/backend/base/langflow/services/task/backends/celery.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test background tasks using `asyncio.create_task()` and verify completion with `asyncio.wait_for()` with appropriate timeout constraints

Applied to files:

  • src/backend/base/langflow/services/task/service.py
  • src/backend/base/langflow/services/task/backends/celery.py
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to src/backend/base/langflow/components/**/*.py : For component `run()` methods, use async/await pattern and return appropriate message types

Applied to files:

  • src/backend/base/langflow/services/task/backends/celery.py
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to src/backend/base/langflow/api/**/*.py : Backend API endpoints should be organized by version (v1/, v2/) under `src/backend/base/langflow/api/` with specific modules for features (chat.py, flows.py, users.py, etc.)

Applied to files:

  • src/backend/base/langflow/api/v2/workflow.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test both sync and async code paths, mock external dependencies appropriately, test error handling and edge cases, validate input/output behavior, and test component initialization and configuration

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Use predefined JSON flows and utility functions from `tests.unit.build_utils` (create_flow, build_flow, get_build_events, consume_and_assert_stream) for flow execution testing

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test Langflow REST API endpoints using the `client` fixture with appropriate HTTP methods (GET, POST, etc.), headers (logged_in_headers), and payload validation

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Create comprehensive unit tests for all new backend components. If unit tests are incomplete, create a corresponding Markdown file documenting manual testing steps and expected outcomes

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test component versioning and backward compatibility using `file_names_mapping` fixture with `VersionComponentMapping` objects mapping component files across Langflow versions

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
📚 Learning: 2025-08-05T22:51:27.961Z
Learnt from: edwinjosechittilappilly
Repo: langflow-ai/langflow PR: 0
File: :0-0
Timestamp: 2025-08-05T22:51:27.961Z
Learning: The TestComposioComponentAuth test in src/backend/tests/unit/components/bundles/composio/test_base_composio.py demonstrates proper integration testing patterns for external API components, including real API calls with mocking for OAuth completion, comprehensive resource cleanup, and proper environment variable handling with pytest.skip() fallbacks.

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Use `monkeypatch` fixture to mock internal functions for testing error handling scenarios; validate error status codes and error message content in responses

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
📚 Learning: 2025-12-03T18:17:26.561Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-12-03T18:17:26.561Z
Learning: Applies to **/test_*.py : For async functions in backend tests, ensure proper async testing patterns are used with pytest

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
📚 Learning: 2025-12-03T18:17:26.561Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-12-03T18:17:26.561Z
Learning: Applies to **/test_*.py : For API endpoints, verify both success and error response testing

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test webhook endpoints by posting to `api/v1/webhook/{endpoint_name}` with appropriate payloads and validating response status codes

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to tests/unit/**/*.py : Database and flow integration tests should use helper functions from `tests.unit.build_utils` such as `create_flow`, `build_flow`, and `get_build_events`

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
📚 Learning: 2025-12-19T18:04:08.938Z
Learnt from: Jkavia
Repo: langflow-ai/langflow PR: 11111
File: src/backend/tests/unit/api/v2/test_workflow.py:10-11
Timestamp: 2025-12-19T18:04:08.938Z
Learning: In the langflow-ai/langflow repository, pytest-asyncio is configured with asyncio_mode = 'auto' in pyproject.toml. This means you do not need to decorate test functions or classes with pytest.mark.asyncio; async tests are auto-detected and run by pytest-asyncio. When reviewing tests, ensure they rely on this configuration (i.e., avoid unnecessary pytest.mark.asyncio decorators) and that tests living under any tests/ path (e.g., src/.../tests/**/*.py) follow this convention. If a test explicitly requires a different asyncio policy, document it and adjust the config accordingly.

Applied to files:

  • src/backend/tests/unit/api/v2/test_workflow.py
  • src/backend/tests/unit/api/v2/test_converters.py
🧬 Code graph analysis (5)
src/backend/base/langflow/services/task/factory.py (4)
src/lfx/src/lfx/services/interfaces.py (1)
  • settings (55-57)
src/backend/base/langflow/services/factory.py (1)
  • ServiceFactory (12-24)
src/backend/base/langflow/services/task/service.py (1)
  • TaskService (17-91)
src/backend/tests/unit/api/v2/test_mcp_servers_file.py (1)
  • settings_service (100-101)
src/backend/base/langflow/services/task/service.py (5)
src/backend/base/langflow/services/task/backends/celery.py (3)
  • CeleryBackend (14-47)
  • launch_task (22-30)
  • get_task_status (37-47)
src/backend/base/langflow/services/deps.py (1)
  • get_queue_service (242-246)
src/backend/base/langflow/services/task/backends/anyio.py (2)
  • AnyIOBackend (54-117)
  • launch_task (64-95)
src/backend/base/langflow/services/job_queue/service.py (3)
  • create_queue (126-152)
  • start_job (154-205)
  • get_queue_data (207-227)
src/lfx/src/lfx/schema/workflow.py (1)
  • JobStatus (11-18)
src/backend/tests/unit/api/v2/test_workflow.py (2)
src/backend/base/langflow/exceptions/api.py (1)
  • WorkflowValidationError (21-22)
src/backend/base/langflow/services/database/models/flow/model.py (1)
  • Flow (186-213)
src/backend/tests/unit/api/v2/test_converters.py (1)
src/backend/base/langflow/api/v2/converters.py (9)
  • _build_metadata_for_non_output (285-319)
  • _extract_file_path (195-212)
  • _extract_model_source (178-192)
  • _extract_nested_value (104-128)
  • _extract_text_from_message (131-175)
  • _get_raw_content (215-239)
  • _simplify_output_content (242-282)
  • parse_flat_inputs (43-101)
  • run_response_to_workflow_response (322-476)
src/backend/base/langflow/services/job_queue/service.py (2)
src/lfx/src/lfx/schema/workflow.py (1)
  • JobStatus (11-18)
src/backend/base/langflow/services/task/backends/anyio.py (1)
  • result (36-37)
🔇 Additional comments (15)
src/backend/base/langflow/exceptions/api.py (1)

13-22: Workflow exception hierarchy looks good.
Clear base and specialized error types will help standardize V2 error handling.

src/lfx/src/lfx/schema/workflow.py (1)

45-78: Flat input schema examples are clear.
The component-prefixed keys make the new input format unambiguous for clients.

src/backend/base/langflow/services/task/backends/celery.py (1)

17-35: Lazy Celery imports and launch guard look good.
This reduces import-time side effects and keeps Celery types isolated.

src/backend/base/langflow/services/task/service.py (1)

20-32: Settings-driven backend selection looks good.
The Celery vs AnyIO switching is clean and straightforward.

src/backend/base/langflow/api/v2/converters.py (7)

104-129: Nested extraction helper looks solid.
The dict/attribute traversal logic is robust and safe.


131-175: Message text extraction covers common shapes.
The layered fallbacks should handle most message payloads.


178-193: Model source extraction looks good.
Clear and minimal metadata capture.


195-213: SaveToFile path extraction is clean.
Nice, focused detection for the “saved successfully” message.


242-282: Output content simplification logic is solid.
The type-specific extraction is clean and readable.


285-319: Metadata builder for non-output nodes looks good.
Nice separation of model source and file-path metadata.


322-464: Terminal-node conversion logic looks solid.
Output key collision handling and metadata propagation are well thought out.

src/backend/tests/unit/api/v2/test_workflow.py (1)

1-26: Comprehensive test suite with good coverage.

The test module provides excellent coverage of the V2 Workflow API, including:

  • Developer API protection scenarios
  • Error handling for various failure modes
  • Synchronous execution flows
  • Response structure validation

The test organization is clear and follows the coding guidelines for API endpoint testing.

src/backend/base/langflow/services/job_queue/service.py (1)

253-258: Good improvement: Proper error handling during task cancellation.

The try/except block around await task properly handles CancelledError and logs other exceptions, preventing cleanup failures from propagating.

src/backend/tests/unit/api/v2/test_converters.py (1)

1-49: Excellent test coverage for converter functions.

The test suite comprehensively covers all converter functions with:

  • Realistic payload structures from actual components
  • Edge cases (empty inputs, None values, malformed data)
  • Multiple message format variations
  • Duplicate name handling
  • Error conditions and fallbacks

Based on learnings, this follows best practices for testing both sync and async code paths with appropriate mocking.

src/backend/base/langflow/api/v2/workflow.py (1)

218-221: Good practice: Re-raising CancelledError for timeout mechanism.

Properly re-raising CancelledError ensures the asyncio.wait_for() timeout mechanism works correctly. This is essential for the timeout protection feature.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +43 to +99
def parse_flat_inputs(inputs: dict[str, Any]) -> tuple[dict[str, dict[str, Any]], str | None]:
"""Parse flat inputs structure into tweaks and session_id.
Format: {"component_id.param": value}
Example: {"ChatInput-abc.input_value": "hi", "LLM-xyz.temperature": 0.7}
All parameters (including input_value) are treated as tweaks.
The graph's topological sort handles execution order automatically.
Args:
inputs: The inputs dictionary from WorkflowExecutionRequest
Returns:
Tuple of (tweaks_dict, session_id)
- tweaks_dict: {component_id: {param: value}}
- session_id: Session ID if provided
Example:
>>> inputs = {
... "ChatInput-abc.input_value": "hello",
... "ChatInput-abc.session_id": "session-123",
... "LLM-xyz.temperature": 0.7
... }
>>> tweaks, session_id = parse_flat_inputs(inputs)
>>> tweaks
{'ChatInput-abc': {'input_value': 'hello'}, 'LLM-xyz': {'temperature': 0.7}}
>>> session_id
'session-123'
"""
tweaks: dict[str, dict[str, Any]] = {}
session_id: str | None = None

# Group inputs by component_id
component_inputs: dict[str, dict[str, Any]] = {}

for key, value in inputs.items():
if "." in key:
# Split component_id.param
component_id, param_name = key.split(".", 1)

if component_id not in component_inputs:
component_inputs[component_id] = {}
component_inputs[component_id][param_name] = value
# No dot - treat as component-level dict (for backward compatibility)
elif isinstance(value, dict):
tweaks[key] = value

# Process component inputs
for component_id, params in component_inputs.items():
# Extract session_id if present (use first one found)
if "session_id" in params and session_id is None:
session_id = params["session_id"]

# Build tweaks for all parameters except session_id
tweak_params = {k: v for k, v in params.items() if k != "session_id"}
if tweak_params:
tweaks[component_id] = tweak_params
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Back-compat path drops session_id.
For inputs without ".", session_id remains inside tweaks and is never returned, which can break legacy session handling. Extract it before storing tweaks.

🐛 Proposed fix
-        elif isinstance(value, dict):
-            tweaks[key] = value
+        elif isinstance(value, dict):
+            if "session_id" in value and session_id is None:
+                session_id = value.get("session_id")
+                value = {k: v for k, v in value.items() if k != "session_id"}
+            if value:
+                tweaks[key] = value
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/api/v2/converters.py` around lines 43 - 99,
parse_flat_inputs currently leaves session_id inside tweaks for the
backward-compat branch (the elif isinstance(value, dict) case), so session_id is
never returned; update that branch to extract "session_id" from the value dict
(set session_id if not already set) and then store the remaining params into
tweaks[key] (or omit session_id from the stored dict) so that parse_flat_inputs
returns the session_id consistently; reference parse_flat_inputs, tweaks,
session_id, and the elif isinstance(value, dict) branch to locate the fix.

Comment on lines +215 to +239
def _get_raw_content(vertex_output_data: Any) -> Any:
"""Extract raw content from vertex output data.
Tries multiple fields in order: outputs, results, messages.
Note: Uses 'is not None' checks to avoid treating empty collections as missing.
Args:
vertex_output_data: The output data from RunResponse
Returns:
Raw content or None
"""
if hasattr(vertex_output_data, "outputs") and vertex_output_data.outputs is not None:
return vertex_output_data.outputs
if hasattr(vertex_output_data, "results") and vertex_output_data.results is not None:
return vertex_output_data.results
if hasattr(vertex_output_data, "messages") and vertex_output_data.messages is not None:
return vertex_output_data.messages
if isinstance(vertex_output_data, dict):
# Check for 'results' first, then 'content' if results is None
if "results" in vertex_output_data:
return vertex_output_data["results"]
if "content" in vertex_output_data:
return vertex_output_data["content"]
return vertex_output_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle results=None before falling back to content.
If "results" exists but is None, the current code returns None and skips "content".

🐛 Proposed fix
-        if "results" in vertex_output_data:
-            return vertex_output_data["results"]
+        if "results" in vertex_output_data and vertex_output_data["results"] is not None:
+            return vertex_output_data["results"]
         if "content" in vertex_output_data:
             return vertex_output_data["content"]
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/api/v2/converters.py` around lines 215 - 239,
_get_raw_content returns None if a dict contains the "results" key with a None
value and never checks "content"; update the dict branch in _get_raw_content to
only return vertex_output_data["results"] when the key exists and its value is
not None (e.g., check "results" in vertex_output_data and
vertex_output_data["results"] is not None) and otherwise fall back to return
vertex_output_data["content"] if that key exists and is not None; keep the same
behavior for the attribute checks above.

Comment on lines +496 to +523
def create_error_response(
flow_id: str,
job_id: str | None,
workflow_request: WorkflowExecutionRequest,
error: Exception,
) -> WorkflowExecutionResponse:
"""Create an error response in workflow format.
Args:
flow_id: The flow ID
job_id: The job ID
workflow_request: Original request
error: The exception that occurred
Returns:
WorkflowExecutionResponse with error details
"""
error_detail = ErrorDetail(
error=str(error), code="EXECUTION_ERROR", details={"flow_id": flow_id, "error_type": type(error).__name__}
)

return WorkflowExecutionResponse(
flow_id=flow_id,
job_id=job_id,
object="response",
created_timestamp=str(int(time.time())),
status=JobStatus.FAILED,
errors=[error_detail],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find the WorkflowExecutionResponse definition
cd /tmp && find . -type f -name "*.py" -path "*/backend/*" | head -20
rg "class WorkflowExecutionResponse" --type py

Repository: langflow-ai/langflow

Length of output: 212


🏁 Script executed:

# Find the converters.py file and examine it
fd "converters.py" | grep -E "src/backend"

Repository: langflow-ai/langflow

Length of output: 159


🏁 Script executed:

# Check for WorkflowExecutionResponse definition
rg "WorkflowExecutionResponse" --type py -l | head -10

Repository: langflow-ai/langflow

Length of output: 321


🏁 Script executed:

# Look at the schema files for WorkflowExecutionResponse definition
cat -n src/lfx/src/lfx/schema/workflow.py | head -100

Repository: langflow-ai/langflow

Length of output: 3772


🏁 Script executed:

# Also check the converters.py file around lines 496-523
wc -l src/backend/base/langflow/api/v2/converters.py

Repository: langflow-ai/langflow

Length of output: 114


🏁 Script executed:

# Read the converters.py file to find create_error_response
sed -n '480,530p' src/backend/base/langflow/api/v2/converters.py

Repository: langflow-ai/langflow

Length of output: 1315


🏁 Script executed:

# Find all call sites of create_error_response
rg "create_error_response" --type py -B 2 -A 2

Repository: langflow-ai/langflow

Length of output: 5641


🏁 Script executed:

# Check test files to understand usage patterns
cat -n src/backend/tests/unit/api/v2/test_converters.py | head -150

Repository: langflow-ai/langflow

Length of output: 6171


🏁 Script executed:

# Find the exact call to create_error_response with None job_id
rg "create_error_response" -B 5 -A 5 src/backend/base/langflow/api/v2/workflow.py | grep -A 10 "job_id=None"

Repository: langflow-ai/langflow

Length of output: 168


🏁 Script executed:

# Read the test file to see if it tests None case
sed -n '200,280p' src/backend/tests/unit/api/v2/test_converters.py

Repository: langflow-ai/langflow

Length of output: 3290


🏁 Script executed:

# Search for test_create_error_response tests
sed -n '380,480p' src/backend/tests/unit/api/v2/test_converters.py

Repository: langflow-ai/langflow

Length of output: 4124


🏁 Script executed:

# Get more context around the create_error_response tests
sed -n '380,500p' src/backend/tests/unit/api/v2/test_converters.py

Repository: langflow-ai/langflow

Length of output: 4850


🏁 Script executed:

# Find the create_error_response test class
rg "class TestCreateErrorResponse" -A 100 src/backend/tests/unit/api/v2/test_converters.py | head -120

Repository: langflow-ai/langflow

Length of output: 3968


Fix job_id parameter inconsistency in error response handler.

WorkflowExecutionResponse.job_id is a required str field, but create_error_response() accepts job_id: str | None and passes it directly to the model. This causes Pydantic validation errors when None is passed (which happens in workflow.py exception handler), masking the original error. Either make WorkflowExecutionResponse.job_id optional or ensure a valid ID is always provided before calling create_error_response().

🤖 Prompt for AI Agents
In `@src/backend/base/langflow/api/v2/converters.py` around lines 496 - 523, The
error response builder accepts job_id: str | None but passes it into
WorkflowExecutionResponse.job_id which is required str, causing Pydantic
validation failures; modify create_error_response so it ensures a non-None
job_id before constructing WorkflowExecutionResponse (e.g., if job_id is None,
set job_id = workflow_request.request_id if available or fallback to
str(uuid.uuid4())), update the function signature/comment accordingly, and
import uuid if you use uuid4; ensure callers (e.g., the exception handler in
workflow.py) can still pass None safely.

},
)

router = APIRouter(prefix="/workflows", tags=["Workflow"], dependencies=[Depends(check_developer_api_enabled)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the router configuration in the workflow.py file
head -100 src/backend/base/langflow/api/v2/workflow.py | tail -20

Repository: langflow-ai/langflow

Length of output: 834


🏁 Script executed:

#!/bin/bash
# Find and examine test_workflow.py to check the actual test paths
fd test_workflow.py src/backend/tests

Repository: langflow-ai/langflow

Length of output: 110


🏁 Script executed:

#!/bin/bash
# Search for API paths being used in tests related to workflows
rg "api/v2/workflow" src/backend/tests --type py -A 2 -B 2

Repository: langflow-ai/langflow

Length of output: 12485


🏁 Script executed:

#!/bin/bash
# Find other APIRouter definitions in the v2 API to see naming conventions
rg "APIRouter\(prefix=" src/backend/base/langflow/api/v2/ -A 1

Repository: langflow-ai/langflow

Length of output: 266


🏁 Script executed:

#!/bin/bash
# Find all APIRouter definitions in the entire v2 API directory
fd . src/backend/base/langflow/api/v2/ -type f -name "*.py" | head -20

Repository: langflow-ai/langflow

Length of output: 234


🏁 Script executed:

#!/bin/bash
# Search for all router definitions in v2 API
rg "router = APIRouter" src/backend/base/langflow/api/v2/ -A 1

Repository: langflow-ai/langflow

Length of output: 710


🏁 Script executed:

#!/bin/bash
# Check how routers are registered in the main app/API setup
fd "main.py\|app.py\|__init__.py" src/backend/base/langflow/api/v2/ -type f

Repository: langflow-ai/langflow

Length of output: 234


🏁 Script executed:

#!/bin/bash
# Look for router inclusion/registration patterns
rg "include_router\|from.*workflow import" src/backend/base/langflow/api/ -A 2 -B 2 | head -50

Repository: langflow-ai/langflow

Length of output: 46


🏁 Script executed:

#!/bin/bash
# Find main API setup files
find src/backend/base/langflow/api -name "*.py" | grep -E "(main|__init__|app)" | head -10

Repository: langflow-ai/langflow

Length of output: 295


🏁 Script executed:

#!/bin/bash
# Search for include_router calls to see how workflows router is used
rg "include_router.*workflow" src/backend/base/langflow/

Repository: langflow-ai/langflow

Length of output: 148


🏁 Script executed:

#!/bin/bash
# Check the router.py file to see how routers are included and what base path is used
cat src/backend/base/langflow/api/router.py | head -100

Repository: langflow-ai/langflow

Length of output: 2284


🏁 Script executed:

#!/bin/bash
# Verify the complete path by checking how TestClient interprets paths
# Also check if there are route definitions for both POST and GET on /workflow
rg "@router\.(post|get)" src/backend/base/langflow/api/v2/workflow.py -B 2 -A 5 | head -40

Repository: langflow-ai/langflow

Length of output: 548


Update test paths to match the plural router prefix /workflows.

The router is configured with prefix /workflows (plural) but tests make requests to api/v2/workflow (singular). When the router handles requests, the complete path is /api/v2/workflows, so all test paths must be updated from api/v2/workflow to api/v2/workflows (including api/v2/workflow/stopapi/v2/workflows/stop).

🤖 Prompt for AI Agents
In `@src/backend/base/langflow/api/v2/workflow.py` at line 95, Tests are using the
singular path but the router is created as router =
APIRouter(prefix="/workflows", ...) so update all test request paths from
api/v2/workflow... to api/v2/workflows... (e.g., change api/v2/workflow ->
api/v2/workflows and api/v2/workflow/stop -> api/v2/workflows/stop) so they
match the APIRouter prefix and the endpoints defined by the Workflow router.

Comment on lines +131 to +134
except asyncio.TimeoutError as e:
msg = f"Execution exceeded {EXECUTION_TIMEOUT} seconds"
raise WorkflowTimeoutError(msg) from e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: WorkflowTimeoutError is not imported.

WorkflowTimeoutError is raised at line 133 but is not imported. This will cause NameError at runtime when a timeout occurs.

Add the import:

from langflow.exceptions.api import WorkflowTimeoutError, WorkflowValidationError
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/api/v2/workflow.py` around lines 131 - 134, The
file raises WorkflowTimeoutError in the except block around async execution but
doesn't import it; add an import statement for WorkflowTimeoutError (and also
WorkflowValidationError as suggested) from langflow.exceptions.api so the
symbols WorkflowTimeoutError and WorkflowValidationError are defined for use in
this module (update the top-of-file imports to include these names).

Comment on lines +34 to +64
async def fire_and_forget_task(
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
) -> str:
"""Launch a task in the background and forget about it.
Note: This is required since the local AnyIOBackend does not support background tasks
natively in a non-blocking way for the API.
This method abstracts the background execution. If Celery is enabled,
it uses the distributed queue. Otherwise, it uses the JobQueueService
to manage and track the asynchronous task locally.
Args:
task_func: The task function to launch.
*args: Positional arguments for the task function.
**kwargs: Keyword arguments for the task function.
Returns:
str: A task_id that can be used to track or cancel the task via JobQueueService.
"""
if self.use_celery:
task_id, _ = self.backend.launch_task(task_func, *args, **kwargs)
return task_id

task_id = str(uuid4())
# Create a job queue for the task and track the job execution using the
# JobQueueService
job_queue_service = get_queue_service()
job_queue_service.create_queue(task_id)
job_queue_service.start_job(task_id, task_func(*args, **kwargs))
return task_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find and examine the service file
find . -path "*src/backend*" -name "service.py" -path "*task*" | head -20

Repository: langflow-ai/langflow

Length of output: 116


🏁 Script executed:

# List the relevant file to understand structure
git ls-files "src/backend*" | grep -i task | grep -E "\.py$" | head -20

Repository: langflow-ai/langflow

Length of output: 902


🏁 Script executed:

# Search for the exact file
find . -type f -name "service.py" | xargs grep -l "fire_and_forget_task" 2>/dev/null

Repository: langflow-ai/langflow

Length of output: 116


🏁 Script executed:

# Examine the task service file
cat -n src/backend/base/langflow/services/task/service.py

Repository: langflow-ai/langflow

Length of output: 3987


🏁 Script executed:

# Also check the JobQueueService implementation
find . -type f -name "*.py" | xargs grep -l "class JobQueueService" 2>/dev/null | head -5

Repository: langflow-ai/langflow

Length of output: 179


🏁 Script executed:

# Search for all call sites of fire_and_forget_task
rg "fire_and_forget_task" -A 2 -B 2

Repository: langflow-ai/langflow

Length of output: 888


🏁 Script executed:

# Examine JobQueueService.start_job implementation
cat -n src/backend/base/langflow/services/job_queue/service.py

Repository: langflow-ai/langflow

Length of output: 16484


🏁 Script executed:

# Check the call site in workflow.py to see what's being passed
rg "fire_and_forget_task" -A 5 -B 5 src/backend/base/langflow/api/v2/workflow.py

Repository: langflow-ai/langflow

Length of output: 431


🏁 Script executed:

# Search for run_graph_internal function to see if it's async
rg "def run_graph_internal" -A 2

Repository: langflow-ai/langflow

Length of output: 456


🏁 Script executed:

# Find all call sites of fire_and_forget_task
rg "fire_and_forget_task\(" -B 3 -A 10

Repository: langflow-ai/langflow

Length of output: 2453


🏁 Script executed:

# Check if there are other functions being passed to fire_and_forget_task
rg "fire_and_forget_task" | grep -v "def fire_and_forget_task"

Repository: langflow-ai/langflow

Length of output: 166


🏁 Script executed:

# Check the type hint more carefully
cat -n src/backend/base/langflow/services/task/service.py | grep -A 15 "async def fire_and_forget_task"

Repository: langflow-ai/langflow

Length of output: 916


🏁 Script executed:

# Verify what happens if a non-async callable is passed to start_job
# by checking the task_wrapper implementation
cat -n src/backend/base/langflow/services/job_queue/service.py | sed -n '184,202p'

Repository: langflow-ai/langflow

Length of output: 1136


🏁 Script executed:

# Check if there are any imports for inspect or asyncio.to_thread
rg "import inspect|from inspect|asyncio.to_thread" src/backend/base/langflow/services/

Repository: langflow-ai/langflow

Length of output: 3421


🏁 Script executed:

# Check if the Callable type hint should be more restrictive
rg "Coroutine\[" src/backend/base/langflow/services/task/service.py

Repository: langflow-ai/langflow

Length of output: 46


🏁 Script executed:

# Search for any other potential callers or dynamic calls
rg "fire_and_forget_task" --type py

Repository: langflow-ai/langflow

Length of output: 253


🏁 Script executed:

# Check if there are test files that might call this with sync functions
find . -path "*/tests/*" -name "*.py" -exec grep -l "fire_and_forget_task" {} \;

Repository: langflow-ai/langflow

Length of output: 46


🏁 Script executed:

# Let's also check the type import to see if Coroutine is imported
head -20 src/backend/base/langflow/services/task/service.py

Repository: langflow-ai/langflow

Length of output: 695


Add runtime validation for async callables in fire_and_forget_task.

The method's type hint Callable[..., Any] permits sync callables, but line 63 executes task_func(*args, **kwargs) immediately on the event loop. If sync, this returns a non-awaitable; start_job then attempts to await it (JobQueueService line 187), causing a runtime error. Verify all callers pass async functions, or add a guard using inspect.iscoroutinefunction() and asyncio.to_thread() for sync functions as suggested below.

🛠️ Possible guard for sync callables
+import asyncio
+import inspect
@@
-        job_queue_service.start_job(task_id, task_func(*args, **kwargs))
+        if inspect.iscoroutinefunction(task_func):
+            task_coro = task_func(*args, **kwargs)
+        else:
+            task_coro = asyncio.to_thread(task_func, *args, **kwargs)
+        job_queue_service.start_job(task_id, task_coro)
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/task/service.py` around lines 34 - 64, The
fire_and_forget_task currently accepts Callable[..., Any] but calls
task_func(*args, **kwargs) directly for the non-Celery path, which will break if
task_func is synchronous because JobQueueService.start_job expects an awaitable;
update fire_and_forget_task to detect async vs sync callables (use
inspect.iscoroutinefunction(task_func) or inspect.isawaitable) and for sync
functions wrap execution via asyncio.to_thread(...) so start_job always receives
an awaitable; keep the Celery branch using backend.launch_task(task_func, ...)
untouched and reference the get_queue_service(), JobQueueService.start_job, and
the fire_and_forget_task function when making the change.

Comment on lines +80 to +91
async def get_task_status(self, job_id: str) -> JobStatus:
"""Retrieves the status of a background task."""
if self.use_celery:
return self.backend.get_task_status(job_id)

# Local JobQueueService
job_queue_service = get_queue_service()
try:
job_data = job_queue_service.get_queue_data(job_id)
return job_data["status"]
except Exception: # noqa: BLE001
return JobStatus.ERROR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Import JobStatus for the error fallback.
JobStatus.ERROR is referenced but not imported, so failures will raise NameError instead of returning a status.

🐛 Proposed fix
 from langflow.services.task.backends.celery import CeleryBackend
 from uuid import uuid4
 from langflow.services.deps import get_queue_service
+from lfx.schema.workflow import JobStatus
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/task/service.py` around lines 80 - 91,
get_task_status references JobStatus.ERROR in its exception fallback but
JobStatus is not imported; add the missing import for the JobStatus enum at the
top of the module so the fallback returns a valid JobStatus instead of raising
NameError. Locate the get_task_status function and ensure an import like "from
<appropriate_module> import JobStatus" is present (matching the project's
JobStatus definition) so JobStatus.ERROR resolves correctly.

Comment on lines +998 to +1009
def test_run_response_preserves_inputs(self):
"""Test that inputs are preserved in response."""
graph = Mock()
graph.vertices = []
graph.get_terminal_nodes = Mock(return_value=[])

run_response = Mock()
run_response.outputs = []

inputs = {"component.param": "value"}
WorkflowExecutionRequest(flow_id="flow-1", inputs=inputs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incomplete test: Function is never called.

The test creates mocks and a request object but never calls run_response_to_workflow_response. The test name suggests verifying inputs are preserved in the response, but no assertion validates this.

🐛 Proposed fix
 def test_run_response_preserves_inputs(self):
     """Test that inputs are preserved in response."""
     graph = Mock()
     graph.vertices = []
     graph.get_terminal_nodes = Mock(return_value=[])

     run_response = Mock()
     run_response.outputs = []

     inputs = {"component.param": "value"}
-    WorkflowExecutionRequest(flow_id="flow-1", inputs=inputs)
+    request = WorkflowExecutionRequest(flow_id="flow-1", inputs=inputs)
+
+    response = run_response_to_workflow_response(run_response, "flow-1", "job-1", request, graph)
+
+    assert response.inputs == inputs
🤖 Prompt for AI Agents
In `@src/backend/tests/unit/api/v2/test_converters.py` around lines 998 - 1009,
The test test_run_response_preserves_inputs currently constructs graph,
run_response, inputs and a WorkflowExecutionRequest but never exercises the
converter; call run_response_to_workflow_response(request, run_response, graph)
(using the same WorkflowExecutionRequest instance) and assert that the returned
WorkflowResponse (or equivalent) preserves the inputs (e.g., response.inputs ==
inputs). Ensure you reference the existing symbols
run_response_to_workflow_response, WorkflowExecutionRequest, run_response and
graph when adding the call and the assertion.

Comment on lines 662 to 666
# Test GET /workflow without API key
response = await client.get("api/v2/workflow?job_id=550e8400-e29b-41d4-a716-446655440001")
assert response.status_code == 403
assert "API key must be passed" in response.json()["detail"]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Orphaned test code will not execute.

These lines are positioned at class indentation level (inside TestWorkflowErrorHandling but outside any method). This code will execute during module import rather than as a test, and will fail immediately because client is not defined in that scope.

This test logic should be part of test_all_endpoints_require_api_key_authentication in TestWorkflowDeveloperAPIProtection class (around line 282).

🐛 Suggested fix: Move to the correct test method

Move lines 662-666 into test_all_endpoints_require_api_key_authentication method in TestWorkflowDeveloperAPIProtection class, after the POST /workflow test (around line 303):

async def test_all_endpoints_require_api_key_authentication(
    self,
    client: AsyncClient,
    mock_settings_dev_api_enabled,  # noqa: ARG002
):
    """Test that all workflow endpoints require API key authentication."""
    # Test POST /workflow without API key
    request_data = {
        "flow_id": "550e8400-e29b-41d4-a716-446655440000",
        "background": False,
        "stream": False,
        "inputs": None,
    }

    response = await client.post(
        "api/v2/workflow",
        json=request_data,
    )
    assert response.status_code == 403
    assert "API key must be passed" in response.json()["detail"]

    # Test GET /workflow without API key
    response = await client.get("api/v2/workflow?job_id=550e8400-e29b-41d4-a716-446655440001")
    assert response.status_code == 403
    assert "API key must be passed" in response.json()["detail"]

    # Test POST /workflow/stop without API key
    response = await client.post(
        "api/v2/workflow/stop",
        json={"job_id": "550e8400-e29b-41d4-a716-446655440001"},
    )
    assert response.status_code == 403
    assert "API key must be passed" in response.json()["detail"]
🤖 Prompt for AI Agents
In `@src/backend/tests/unit/api/v2/test_workflow.py` around lines 662 - 666,
Orphaned test code in TestWorkflowErrorHandling is executed at import because
it's outside any method; move the GET /workflow no-API-key assertions (the
client.get call and the two asserts) into the
test_all_endpoints_require_api_key_authentication method of the
TestWorkflowDeveloperAPIProtection class (after the POST /workflow check) so
they run as part of that test; reference
test_all_endpoints_require_api_key_authentication,
TestWorkflowDeveloperAPIProtection, and the client fixture when relocating the
code.

fallback_to_env_vars = False

graph.session_id = effective_session_id
print("Effective session ID: ", effective_session_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Remove debug print statements before merging.

These print() statements appear to be leftover debug code. They should be removed or converted to use the existing logger (already imported at line 10) with an appropriate log level like logger.debug().

Issues with print statements in production:

  • No log level control — always outputs regardless of environment
  • Inconsistent with the rest of the codebase which uses the logger
  • run_outputs may contain sensitive data leaked to stdout
🔧 Suggested fix
     graph.session_id = effective_session_id
-    print("Effective session ID: ", effective_session_id)
+    logger.debug("Effective session ID: %s", effective_session_id)
     run_outputs = await graph.arun(
         inputs=inputs_list,
         inputs_components=components,
         types=types,
         outputs=outputs or [],
         stream=stream,
         session_id=effective_session_id or "",
         fallback_to_env_vars=fallback_to_env_vars,
         event_manager=event_manager,
     )
-    print("Run outputs: ", run_outputs)
+    logger.debug("Run outputs: %s", run_outputs)
     return run_outputs, effective_session_id

Alternatively, if these were only needed during development, simply remove them entirely.

Also applies to: 88-88

🤖 Prompt for AI Agents
In `@src/lfx/src/lfx/processing/process.py` at line 77, Remove the leftover debug
print statements that output sensitive data (e.g., the line printing
effective_session_id and the other at line 88); replace them with the module
logger (imported as logger) at an appropriate level such as logger.debug() if
the information should be retained for debugging, or simply delete the calls if
not needed—locate the prints referencing effective_session_id and any
run_outputs in process.py (e.g., the print("Effective session ID: ",
effective_session_id) and the one at line 88) and update accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants