Skip to content

feat(llama_index): add support for llama-index workflow #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ setup: set_python_env

# ============================

setup_test:
setup_test: setup
@poetry install --with=test --all-extras

test: setup_test
Expand All @@ -57,5 +57,5 @@ test_langgraph_agent: setup_test

test_langgraph_software: setup_test
poetry run pytest tests/test_langgraph_graph_with_io_mapper.py
unittest:
unittest: setup_test
poetry run pytest tests/unittests -vvrx
6 changes: 5 additions & 1 deletion agntcy_iomapper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 Cisco and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
# ruff: noqa: F401
from agntcy_iomapper.agent import FieldMetadata, IOMappingAgent, IOMappingAgentMetadata
from agntcy_iomapper.agent import IOMappingAgent
from agntcy_iomapper.base import FieldMetadata, IOMappingAgentMetadata
from agntcy_iomapper.imperative import (
ImperativeIOMapper,
ImperativeIOMapperInput,
ImperativeIOMapperOutput,
)
from agntcy_iomapper.llamaindex import IOMappingInputEvent, IOMappingOutputEvent

__all__ = [
"IOMappingAgent",
"IOMappingAgentMetadata",
"IOMappingOutputEvent",
"IOMappingInputEvent",
"ImperativeIOMapper",
"ImperativeIOMapperInput",
"ImperativeIOMapperOutput",
Expand Down
12 changes: 3 additions & 9 deletions agntcy_iomapper/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@
# SPDX-License-Identifier: Apache-2.0

from agntcy_iomapper.agent.agent_io_mapper import IOMappingAgent
from agntcy_iomapper.agent.base import AgentIOMapper
from agntcy_iomapper.agent.models import (
AgentIOMapperConfig,
AgentIOMapperInput,
AgentIOMapperOutput,
from agntcy_iomapper.base.models import (
BaseIOMapperConfig,
FieldMetadata,
IOMappingAgentMetadata,
)

__all__ = [
"AgentIOMapper",
"AgentIOMapperOutput",
"AgentIOMapperConfig",
"BaseIOMapperConfig",
"IOMappingAgent",
"IOMappingAgentMetadata",
"AgentIOMapperInput",
"FieldMetadata",
]
135 changes: 94 additions & 41 deletions agntcy_iomapper/agent/agent_io_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,58 @@
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Any, Optional, Tuple, Union
from typing import Any, Callable, List, Optional, Union

from langchain_core.language_models import BaseChatModel
from langchain_core.runnables import Runnable
from openapi_pydantic import Schema
from llama_index.core.base.llms.base import BaseLLM
from llama_index.core.tools import BaseTool
from llama_index.core.workflow import (
Workflow,
)
from pydantic import BaseModel, Field, model_validator
from typing_extensions import Self

from agntcy_iomapper.agent.models import (
from agntcy_iomapper.base.models import (
AgentIOMapperInput,
ArgumentsDescription,
FieldMetadata,
IOMappingAgentMetadata,
)
from agntcy_iomapper.base import ArgumentsDescription
from agntcy_iomapper.base.utils import create_type_from_schema, extract_nested_fields
from agntcy_iomapper.base.utils import extract_nested_fields, get_io_types
from agntcy_iomapper.imperative import (
ImperativeIOMapper,
ImperativeIOMapperInput,
)
from agntcy_iomapper.langgraph import LangGraphIOMapper, LangGraphIOMapperConfig
from agntcy_iomapper.llamaindex.llamaindex import (
LLamaIndexIOMapper,
)

logger = logging.getLogger(__name__)


class IOMappingAgent(BaseModel):
metadata: IOMappingAgentMetadata = Field(
"""This class exposes all
The IOMappingAgent class is designed for developers building sophisticated multi-agent software that require seamless integration and interaction between
the different agents and workflow steps.
By utilizing the methods provided, developers can construct complex workflows and softwares.
The IOMappingAgent class is intended to serve as a foundational component in applications requiring advanced IO mapping agents in multi-agent systems.
"""

metadata: Optional[IOMappingAgentMetadata] = Field(
...,
description="Details about the fields to be used in the translation and about the output",
)
llm: Optional[Union[BaseChatModel, str]] = (
Field(
None,
description="Model to use for translation as LangChain description or model class.",
),
llm: Optional[Union[BaseChatModel, str]] = Field(
None,
description="Model to use for translation as LangChain description or model class.",
)

@model_validator(mode="after")
def _validate_obj(self) -> Self:
if not self.metadata:
return self

if not self.metadata.input_fields or len(self.metadata.input_fields) == 0:
raise ValueError("input_fields not found in the metadata")
Expand Down Expand Up @@ -70,38 +88,12 @@ def _validate_obj(self) -> Self:

return self

def _get_io_types(self, data: Any) -> Tuple[Schema, Schema]:
data_schema = None
if isinstance(data, BaseModel):
data_schema = data.model_json_schema()
# If input schema is provided it overwrites the data schema
input_schema = (
self.metadata.input_schema if self.metadata.input_schema else data_schema
)
# If output schema is provided it overwrites the data schema
output_schema = (
self.metadata.output_schema if self.metadata.output_schema else data_schema
)

if not input_schema or not output_schema:
raise ValueError(
"input_schema, and or output_schema are missing from the metadata, for a better accuracy you are required to provide them in this scenario, or we could not infer the type from the state"
)

input_type = Schema.model_validate(
create_type_from_schema(input_schema, self.metadata.input_fields)
)

output_type = Schema.model_validate(
create_type_from_schema(output_schema, self.metadata.output_fields)
)

return (input_type, output_type)

def langgraph_node(self, data: Any, config: Optional[dict] = None) -> Runnable:
"""This method is used to add a language graph node to a langgraph multi-agent software.
It leverages language models for IO mapping, ensuring efficient communication between agents.
"""

# If there is a template for the output the output_schema is going to be ignored in the translation
input_type, output_type = self._get_io_types(data)
input_type, output_type = get_io_types(data, self.metadata)

data_to_be_mapped = extract_nested_fields(
data, fields=self.metadata.input_fields
Expand Down Expand Up @@ -132,3 +124,64 @@ def langgraph_node(self, data: Any, config: Optional[dict] = None) -> Runnable:

iomapper_config = LangGraphIOMapperConfig(llm=self.llm)
return LangGraphIOMapper(iomapper_config, input).as_runnable()

def langgraph_imperative(
self, data: Any, config: Optional[dict] = None
) -> Runnable:
"""
Description: Similar to langgraph_node, this method adds a language graph node to a multi-agent software.
However, it does not utilize a language model for IO mapping, offering an imperative approach to agent integration.
"""

input_type, output_type = self._get_io_types(data, self.metadata)

data_to_be_mapped = extract_nested_fields(
data, fields=self.metadata.input_fields
)

input = ImperativeIOMapperInput(
input=ArgumentsDescription(
json_schema=input_type,
),
output=ArgumentsDescription(json_schema=output_type),
data=data_to_be_mapped,
)

if not self.metadata.field_mapping:
raise ValueError(
"In order to use imperative mapping field_mapping must be provided in the metadata"
)

imperative_io_mapper = ImperativeIOMapper(
input=input, field_mapping=self.metadata.field_mapping
)
return imperative_io_mapper.as_runnable()

@staticmethod
def as_worfklow_step(workflow: Workflow) -> Callable:
"""This static method allows for the addition of a step to a LlamaIndex workflow.
It integrates seamlessly into workflows, enabling structured progression and task execution.
"""
io_mapper_step = LLamaIndexIOMapper.llamaindex_mapper(workflow)
return io_mapper_step

@staticmethod
def as_workflow_agent(
mapping_metadata: IOMappingAgentMetadata,
llm: BaseLLM,
name: str,
description: str,
can_handoff_to: Optional[List[str]] = None,
tools: Optional[List[Union[BaseTool, Callable]]] = [],
):
"""This static method returns an instance of an agent that can be integrated into a Multi AgentWorkflow.
It provides robust IO mapping capabilities essential for complex multi agent workflow interactions.
"""
return LLamaIndexIOMapper(
mapping_metadata=mapping_metadata,
llm=llm,
tools=tools,
name=name,
description=description,
can_handoff_to=can_handoff_to,
)
Loading