- "value": "from langflow.custom import Component\nfrom langflow.io import DataInput, Output\nfrom langflow.schema import Data\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n DataInput(\n name=\"data\",\n display_name=\"Data\",\n info=\"The initial list of Data objects to iterate over.\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\"),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a list of Data objects or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n return Data(text=\"\")\n\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n return current_item\n\n def done_output(self) -> Data:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n return self.ctx.get(f\"{self._id}_aggregated\", [])\n self.stop(\"done\")\n return Data(text=\"\")\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> Data:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n # Check if loop input is provided and append to aggregated list\n if self.data is not None and not isinstance(self.data, str) and len(aggregated) <= len(data_list):\n aggregated.append(self.data)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
0 commit comments