Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@
"legacy": false,
"lf_version": "1.4.3",
"metadata": {
"code_hash": "e179036a232d",
"code_hash": "8ed8edd10887",
"dependencies": {
"dependencies": [
{
Expand Down Expand Up @@ -1234,7 +1234,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from lfx.components.processing.converter import convert_to_data\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs.inputs import HandleInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data or Message objects, outputting one item at a time and \"\n \"aggregating results from loop inputs. Message objects are automatically converted to \"\n \"Data objects for consistent processing.\"\n )\n documentation: str = \"https://docs.langflow.org/loop\"\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Item\",\n name=\"item\",\n method=\"item_output\",\n allows_loop=True,\n loop_types=[\"Message\"],\n group_outputs=True,\n ),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _convert_message_to_data(self, message: Message) -> Data:\n \"\"\"Convert a Message object to a Data object using Type Convert logic.\"\"\"\n return convert_to_data(message, auto_parse=False)\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects. Message objects are auto-converted to Data.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, Message):\n # Auto-convert Message to Data\n converted_data = self._convert_message_to_data(data)\n return [converted_data]\n if isinstance(data, list) and all(isinstance(item, (Data, Message)) for item in data):\n # Convert any Message objects in the list to Data objects\n converted_list = []\n for item in data:\n if isinstance(item, Message):\n converted_list.append(self._convert_message_to_data(item))\n else:\n converted_list.append(item)\n return converted_list\n msg = \"The 'data' input must be a DataFrame, a list of Data/Message objects, or a single Data/Message object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Now we need to update the dependencies for the next run\n self.update_dependency()\n return current_item\n\n def update_dependency(self):\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n if item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]:\n self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)\n # CRITICAL: Also update run_map so remove_from_predecessors() works correctly\n # run_map[predecessor] = list of vertices that depend on predecessor\n if self._id not in self.graph.run_manager.run_map[item_dependency_id]:\n self.graph.run_manager.run_map[item_dependency_id].append(self._id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\n\n Returns Data or Message objects depending on loop input types.\n \"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n\n # Append the current loop input to aggregated if it's not already included\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n # If the loop input is a Message, convert it to Data for consistency\n if isinstance(loop_input, Message):\n loop_input = self._convert_message_to_data(loop_input)\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
"value": "from lfx.base.flow_controls.loop_utils import (\n execute_loop_body,\n extract_loop_output,\n get_loop_body_start_edge,\n get_loop_body_start_vertex,\n get_loop_body_vertices,\n validate_data_input,\n)\nfrom lfx.components.processing.converter import convert_to_data\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs.inputs import HandleInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data or Message objects, outputting one item at a time and \"\n \"aggregating results from loop inputs. Message objects are automatically converted to \"\n \"Data objects for consistent processing.\"\n )\n documentation: str = \"https://docs.langflow.org/loop\"\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Item\",\n name=\"item\",\n method=\"item_output\",\n allows_loop=True,\n loop_types=[\"Message\"],\n group_outputs=True,\n ),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _convert_message_to_data(self, message: Message) -> Data:\n \"\"\"Convert a Message object to a Data object using Type Convert logic.\"\"\"\n return convert_to_data(message, auto_parse=False)\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n return validate_data_input(data)\n\n def get_loop_body_vertices(self) -> set[str]:\n \"\"\"Identify vertices in this loop's body via graph traversal.\n\n Traverses from the loop's \"item\" output to the vertex that feeds back\n to the loop's \"item\" input, collecting all vertices in between.\n This naturally handles nested loops by stopping at this loop's feedback edge.\n\n Returns:\n Set of vertex IDs that form this loop's body\n \"\"\"\n # Check if we have a proper graph context\n if not hasattr(self, \"_vertex\") or self._vertex is None:\n return set()\n\n return get_loop_body_vertices(\n vertex=self._vertex,\n graph=self.graph,\n get_incoming_edge_by_target_param_fn=self.get_incoming_edge_by_target_param,\n )\n\n def _get_loop_body_start_vertex(self) -> str | None:\n \"\"\"Get the first vertex in the loop body (connected to loop's item output).\n\n Returns:\n The vertex ID of the first vertex in the loop body, or None if not found\n \"\"\"\n # Check if we have a proper graph context\n if not hasattr(self, \"_vertex\") or self._vertex is None:\n return None\n\n return get_loop_body_start_vertex(vertex=self._vertex)\n\n def _extract_loop_output(self, results: list) -> Data:\n \"\"\"Extract the output from subgraph execution results.\n\n Args:\n results: List of VertexBuildResult objects from subgraph execution\n\n Returns:\n Data object containing the loop iteration output\n \"\"\"\n # Get the vertex ID that feeds back to the item input (end of loop body)\n end_vertex_id = self.get_incoming_edge_by_target_param(\"item\")\n return extract_loop_output(results=results, end_vertex_id=end_vertex_id)\n\n async def execute_loop_body(self, data_list: list[Data], event_manager=None) -> list[Data]:\n \"\"\"Execute loop body for each data item.\n\n Creates an isolated subgraph for the loop body and executes it\n for each item in the data list, collecting results.\n\n Args:\n data_list: List of Data objects to iterate over\n event_manager: Optional event manager to pass to subgraph execution for UI events\n\n Returns:\n List of Data objects containing results from each iteration\n \"\"\"\n # Get the loop body configuration once\n loop_body_vertex_ids = self.get_loop_body_vertices()\n start_vertex_id = self._get_loop_body_start_vertex()\n start_edge = get_loop_body_start_edge(self._vertex)\n end_vertex_id = self.get_incoming_edge_by_target_param(\"item\")\n\n return await execute_loop_body(\n graph=self.graph,\n data_list=data_list,\n loop_body_vertex_ids=loop_body_vertex_ids,\n start_vertex_id=start_vertex_id,\n start_edge=start_edge,\n end_vertex_id=end_vertex_id,\n event_manager=event_manager,\n )\n\n def item_output(self) -> Data:\n \"\"\"Output is no longer used - loop executes internally now.\n\n This method is kept for backward compatibility but does nothing.\n The actual loop execution happens in done_output().\n \"\"\"\n self.stop(\"item\")\n return Data(text=\"\")\n\n async def done_output(self, event_manager=None) -> DataFrame:\n \"\"\"Execute the loop body for all items and return aggregated results.\n\n This is now the main execution point for the loop. It:\n 1. Gets the data list to iterate over\n 2. Executes the loop body as an isolated subgraph for each item\n 3. Returns the aggregated results\n\n Args:\n event_manager: Optional event manager for UI event emission\n \"\"\"\n self.initialize_data()\n\n # Get data list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n\n if not data_list:\n return DataFrame([])\n\n # Execute loop body for all items\n try:\n aggregated_results = await self.execute_loop_body(data_list, event_manager=event_manager)\n return DataFrame(aggregated_results)\n except Exception as e:\n # Log error and return empty DataFrame\n from lfx.log.logger import logger\n\n await logger.aerror(f\"Error executing loop body: {e}\")\n raise\n"
},
"data": {
"_input_type": "HandleInput",
Expand Down
Loading
Loading