diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index 65192adf8314..8f597f62c92b 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -13,6 +13,7 @@ from fastapi_pagination import Page, Params from fastapi_pagination.ext.sqlmodel import apaginate from lfx.services.cache.utils import CACHE_MISS +from pydantic import ValidationError from sqlmodel import and_, col, select from langflow.api.utils import ( @@ -437,11 +438,19 @@ async def upload_file( # Normalise code fields: if exported with code-as-lines format, rejoin to # strings before creating the Pydantic models so the DB always stores strings. - if "flows" in data: - data = {**data, "flows": [normalize_code_for_import(f) for f in data["flows"]]} - flow_list = FlowListCreate(**data) - else: - flow_list = FlowListCreate(flows=[FlowCreate(**normalize_code_for_import(data))]) + if not isinstance(data, dict): + raise HTTPException( + status_code=422, + detail="Invalid JSON: expected an object with 'flows' or a single flow object", + ) + try: + if "flows" in data: + data = {**data, "flows": [normalize_code_for_import(f) for f in data["flows"]]} + flow_list = FlowListCreate(**data) + else: + flow_list = FlowListCreate(flows=[FlowCreate(**normalize_code_for_import(data))]) + except ValidationError as e: + raise HTTPException(status_code=422, detail=str(e)) from e # TODO: Full-version import is planned as a follow-up feature. # When implemented, extract raw flow dicts here to read embedded "version" diff --git a/src/backend/base/langflow/services/database/models/traces/model.py b/src/backend/base/langflow/services/database/models/traces/model.py index a6774d553e8e..2208bf70b74f 100644 --- a/src/backend/base/langflow/services/database/models/traces/model.py +++ b/src/backend/base/langflow/services/database/models/traces/model.py @@ -198,8 +198,8 @@ class TraceSummaryRead(BaseModel): total_tokens: int flow_id: UUID session_id: str - input: dict[str, Any] | None = None - output: dict[str, Any] | None = None + input: dict[str, Any] | str | None = None + output: dict[str, Any] | str | None = None class TraceListResponse(BaseModel): diff --git a/src/backend/tests/unit/api/v1/test_flows.py b/src/backend/tests/unit/api/v1/test_flows.py index c9713ce08aae..a983ffc90d06 100644 --- a/src/backend/tests/unit/api/v1/test_flows.py +++ b/src/backend/tests/unit/api/v1/test_flows.py @@ -648,6 +648,87 @@ async def test_create_flow_rejects_traversal_in_subpath(client: AsyncClient, log assert response.status_code == status.HTTP_400_BAD_REQUEST +async def test_upload_flow_rejects_list_payload(client: AsyncClient, logged_in_headers): + """Regression: uploading a JSON array (not an object) must return 422, not 500. + + orjson.loads() on a list payload returns a Python list. Before the isinstance + guard, 'flows' in silently evaluates to False, routing to the else branch + where **normalize_code_for_import(list) raises TypeError — escaping as a 500. + """ + import json + + file_content = json.dumps([{"name": "flow1", "data": {}}]) + + response = await client.post( + "api/v1/flows/upload/", + files={"file": ("flows.json", file_content, "application/json")}, + headers=logged_in_headers, + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + +async def test_upload_flow_rejects_scalar_payload(client: AsyncClient, logged_in_headers): + """Regression: uploading a JSON scalar (string/number) must return 422, not 500.""" + import json + + file_content = json.dumps("just a string") + + response = await client.post( + "api/v1/flows/upload/", + files={"file": ("flows.json", file_content, "application/json")}, + headers=logged_in_headers, + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + +async def test_upload_flow_rejects_endpoint_name_with_dots(client: AsyncClient, logged_in_headers): + """Regression: endpoint_name containing dots must return 422, not 500. + + Previously a ValidationError from the Pydantic model escaped the handler + and hit the global exception_handler, producing a 500 and a Scarf telemetry + event. The import path now wraps FlowCreate construction in a try/except + and re-raises as HTTPException(422). + """ + import json + + flow_data = { + "name": "neuro-vision", + "data": {}, + "endpoint_name": "neuro-vision-planning.phase1.contract", + } + file_content = json.dumps({"folder_name": "proj", "flows": [flow_data]}) + + response = await client.post( + "api/v1/flows/upload/", + files={"file": ("flows.json", file_content, "application/json")}, + headers=logged_in_headers, + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + +async def test_upload_flow_accepts_valid_endpoint_name(client: AsyncClient, logged_in_headers): + """Endpoint names with only letters, numbers, hyphens, and underscores are accepted.""" + import json + + flow_data = { + "name": "neuro-vision", + "data": {}, + "endpoint_name": "neuro-vision-planning_phase1", + } + file_content = json.dumps({"folder_name": "proj", "flows": [flow_data]}) + + response = await client.post( + "api/v1/flows/upload/", + files={"file": ("flows.json", file_content, "application/json")}, + headers=logged_in_headers, + ) + assert response.status_code == status.HTTP_201_CREATED + body = response.json() + assert isinstance(body, list) + assert len(body) == 1 + assert body[0]["name"] == "neuro-vision" + + async def test_upload_flow_rejects_absolute_path(client: AsyncClient, logged_in_headers): """Test that uploading flows with absolute paths is rejected.""" import json diff --git a/src/backend/tests/unit/services/database/models/traces/test_enum_serialization.py b/src/backend/tests/unit/services/database/models/traces/test_enum_serialization.py index 7bfcf896fab1..591bb2f617d7 100644 --- a/src/backend/tests/unit/services/database/models/traces/test_enum_serialization.py +++ b/src/backend/tests/unit/services/database/models/traces/test_enum_serialization.py @@ -18,6 +18,7 @@ SpanStatus, SpanTable, SpanType, + TraceSummaryRead, TraceTable, ) from sqlalchemy import Enum as SQLEnum @@ -114,3 +115,58 @@ def test_bind_processor_emits_enum_value(self, member): bind_processor = enum_type.bind_processor(dialect) bound = bind_processor(member) if bind_processor else member assert bound == member.value + + +_TRACE_SUMMARY_DEFAULTS: dict = { + "id": "00000000-0000-0000-0000-000000000001", + "name": "t", + "status": SpanStatus.OK, + "start_time": None, + "total_latency_ms": 0, + "total_tokens": 0, + "flow_id": "00000000-0000-0000-0000-000000000002", + "session_id": "s", +} + + +class TestTraceSummaryReadIoFields: + """Regression: input/output accept the '[Unserializable Object]' sentinel string. + + When a trace's input or output contains a non-JSON-serializable object the + serialization layer stores the sentinel string ``'[Unserializable Object]'`` + instead of a dict. ``TraceSummaryRead`` must accept that value without + raising a ``ValidationError`` so the list endpoint never returns a 500. + """ + + def test_should_accept_dict_input(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "input": {"key": "value"}}) + assert summary.input == {"key": "value"} + + def test_should_accept_dict_output(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "output": {"result": 1}}) + assert summary.output == {"result": 1} + + def test_should_accept_none_input(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "input": None}) + assert summary.input is None + + def test_should_accept_none_output(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "output": None}) + assert summary.output is None + + def test_should_accept_unserializable_sentinel_as_input(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "input": "[Unserializable Object]"}) + assert summary.input == "[Unserializable Object]" + + def test_should_accept_unserializable_sentinel_as_output(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "output": "[Unserializable Object]"}) + assert summary.output == "[Unserializable Object]" + + def test_should_accept_arbitrary_string_as_input(self): + summary = TraceSummaryRead(**{**_TRACE_SUMMARY_DEFAULTS, "input": "plain string"}) + assert summary.input == "plain string" + + def test_should_default_input_and_output_to_none(self): + summary = TraceSummaryRead(**_TRACE_SUMMARY_DEFAULTS) + assert summary.input is None + assert summary.output is None