Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dataframe to Loop component, fixes loop input error #6996

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b8e5aba
add dataframe support for the loop component
rodrigosnader Mar 10, 2025
fb4228b
[autofix.ci] apply automated fixes
autofix-ci[bot] Mar 10, 2025
48daea6
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Mar 10, 2025
c026756
Merge branch 'main' into loop_component_dataframe_support
edwinjosechittilappilly Mar 11, 2025
59105f1
Merge branch 'main' into loop_component_dataframe_support
italojohnny Mar 18, 2025
d333703
fix: starter project
italojohnny Mar 18, 2025
5224c33
Merge branch 'main' into loop_component_dataframe_support
edwinjosechittilappilly Mar 24, 2025
4852752
update loop component and tests
edwinjosechittilappilly Mar 25, 2025
463257b
Merge branch 'main' into loop_component_dataframe_support
edwinjosechittilappilly Mar 25, 2025
70f9524
[autofix.ci] apply automated fixes
autofix-ci[bot] Mar 25, 2025
235b128
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Mar 25, 2025
7fa0b0b
Merge branch 'main' into loop_component_dataframe_support
edwinjosechittilappilly Mar 25, 2025
3f4db93
update logic
edwinjosechittilappilly Mar 25, 2025
a1d0904
Update loop_basic.py
edwinjosechittilappilly Mar 25, 2025
9678729
Update LoopTest.json
edwinjosechittilappilly Mar 25, 2025
ebf6e61
Update Research Translation Loop.json
edwinjosechittilappilly Mar 25, 2025
bfe6e48
fix lint
edwinjosechittilappilly Mar 25, 2025
0b324d2
format fix
edwinjosechittilappilly Mar 25, 2025
5a357b6
[autofix.ci] apply automated fixes
autofix-ci[bot] Mar 25, 2025
fc9cbef
Merge branch 'main' into loop_component_dataframe_support
edwinjosechittilappilly Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/backend/base/langflow/components/logic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from .flow_tool import FlowToolComponent
from .listen import ListenComponent
from .loop import LoopComponent
from .loop_basic import BasicLoopComponent
from .notify import NotifyComponent
from .pass_message import PassMessageComponent
from .run_flow import RunFlowComponent
from .sub_flow import SubFlowComponent

__all__ = [
"BasicLoopComponent",
"ConditionalRouterComponent",
"DataConditionalRouterComponent",
"FlowToolComponent",
Expand Down
1 change: 1 addition & 0 deletions src/backend/base/langflow/components/logic/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class LoopComponent(Component):
"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs."
)
icon = "infinity"
legacy = True

inputs = [
DataInput(
Expand Down
116 changes: 116 additions & 0 deletions src/backend/base/langflow/components/logic/loop_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from langflow.custom import Component
from langflow.io import HandleInput, Output
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame


class BasicLoopComponent(Component):
display_name = "Loop"
description = (
"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs."
)
icon = "infinity"

inputs = [
HandleInput(
name="data",
display_name="Data or DataFrame",
info="The initial list of Data objects or DataFrame to iterate over.",
input_types=["Data", "DataFrame"],
),
]

outputs = [
Output(display_name="Item", name="item", method="item_output", allows_loop=True),
Output(display_name="Done", name="done", method="done_output"),
]

def initialize_data(self) -> None:
"""Initialize the data list, context index, and aggregated list."""
if self.ctx.get(f"{self._id}_initialized", False):
return

# Ensure data is a list of Data objects
data_list = self._validate_data(self.data)

# Store the initial data and context variables
self.update_ctx(
{
f"{self._id}_data": data_list,
f"{self._id}_index": 0,
f"{self._id}_aggregated": [],
f"{self._id}_initialized": True,
}
)

def _validate_data(self, data):
"""Validate and return a list of Data objects."""
if isinstance(data, DataFrame):
return data.to_data_list()
if isinstance(data, Data):
return [data]
if isinstance(data, list) and all(isinstance(item, Data) for item in data):
return data
msg = "The 'data' input must be a DataFrame, a list of Data objects, or a single Data object."
raise TypeError(msg)

def evaluate_stop_loop(self) -> bool:
"""Evaluate whether to stop item or done output."""
current_index = self.ctx.get(f"{self._id}_index", 0)
data_length = len(self.ctx.get(f"{self._id}_data", []))
return current_index > data_length

Comment on lines +61 to +62
Copy link
Preview

Copilot AI Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using '>= data_length' instead of '> data_length' so that the loop terminates correctly when the current index equals the length of the data list.

Suggested change
return current_index > data_length
return current_index >= data_length

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

def item_output(self) -> Data:
"""Output the next item in the list or stop if done."""
self.initialize_data()
current_item = Data(text="")

if self.evaluate_stop_loop():
self.stop("item")
return Data(text="")

# Get data list and current index
data_list, current_index = self.loop_variables()
if current_index < len(data_list):
# Output current item and increment index
try:
current_item = data_list[current_index]
except IndexError:
current_item = Data(text="")
self.aggregated_output()
self.update_ctx({f"{self._id}_index": current_index + 1})
return current_item

def done_output(self) -> DataFrame:
"""Trigger the done output when iteration is complete."""
self.initialize_data()

if self.evaluate_stop_loop():
self.stop("item")
self.start("done")

aggregated = self.ctx.get(f"{self._id}_aggregated", [])

return DataFrame(aggregated)
self.stop("done")
return DataFrame([])

def loop_variables(self):
"""Retrieve loop variables from context."""
return (
self.ctx.get(f"{self._id}_data", []),
self.ctx.get(f"{self._id}_index", 0),
)

def aggregated_output(self) -> list[Data]:
"""Return the aggregated list once all items are processed."""
self.initialize_data()

# Get data list and aggregated list
data_list = self.ctx.get(f"{self._id}_data", [])
aggregated = self.ctx.get(f"{self._id}_aggregated", [])
loop_input = self.get_loop_output_value("item")
if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):
aggregated.append(loop_input)
self.update_ctx({f"{self._id}_aggregated": aggregated})
return aggregated
21 changes: 21 additions & 0 deletions src/backend/base/langflow/custom/custom_component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,27 @@ async def _run(self):

return await self.build_results()

def get_loop_output_value(self, name: str) -> Any:
"""Get the value of an output that allows looping.

Args:
name (str): The name of the output to check.

Returns:
Any: The value from the vertex if the output allows looping,
otherwise returns None.
"""
if not hasattr(self, "_vertex") or self._vertex is None:
return None

if any(
getattr(output, "allows_loop", False) and output.name == name for output in getattr(self, "outputs", [])
):
return self._vertex.get_value_from_output_names(name)
return None

def __getattr__(self, name: str) -> Any:
# First check if it's a loop output
if "_attributes" in self.__dict__ and name in self.__dict__["_attributes"]:
return self.__dict__["_attributes"][name]
if "_inputs" in self.__dict__ and name in self.__dict__["_inputs"]:
Expand All @@ -737,6 +757,7 @@ def __getattr__(self, name: str) -> Any:
return PlaceholderGraph(
flow_id=flow_id, user_id=str(user_id), session_id=session_id, context={}, flow_name=flow_name
)

msg = f"Attribute {name} not found in {self.__class__.__name__}"
raise AttributeError(msg)

Expand Down
34 changes: 30 additions & 4 deletions src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,33 @@ def parse_data(self) -> None:
self.base_type = base_type
break

def get_value_from_output_names(self, key: str):
if key in self.output_names:
return self.graph.get_vertex(key)
return None
def get_value_from_output_names(self, key: str) -> Any:
"""Get value from output names.

Args:
key (str): Name of output.

Returns:
Any: Value of output.
"""
# Find edges where this vertex is the target and the target handle's field name matches the key
edges = [edge for edge in self.edges if edge.target_id == self.id and edge.target_handle.field_name == key]

if not edges:
return None

# Get the source vertex and its output value
edge = edges[0] # Take the first matching edge
source_vertex = self.graph.get_vertex(edge.source_id)
if not source_vertex:
return None

# Get the output value from the source vertex's results
source_handle = edge.source_handle
if not source_handle or not source_handle.name:
return None

return source_vertex.results.get(source_handle.name)

def get_value_from_template_dict(self, key: str):
template_dict = self.data.get("node", {}).get("template", {})
Expand Down Expand Up @@ -307,6 +330,9 @@ def _set_params_from_normal_edge(self, params: dict, edge: Edge, template_dict:
else:
params[param_key] = self.graph.get_vertex(edge.source_id)
elif param_key in self.output_names:
# If this is a loop output being used as an input
params[param_key] = self.get_value_from_output_names(param_key)
else:
params[param_key] = self.graph.get_vertex(edge.source_id)
return params

Expand Down
Loading
Loading