Skip to content

Feature workflow yaml #1346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ services:
networks:
- super_network
# uncomment to expose redis port to host
# ports:
# - "6379:6379"
ports:
- "6379:6379"
volumes:
- redis_data:/data

Expand All @@ -53,8 +53,8 @@ services:
networks:
- super_network
# uncomment to expose postgres port to host
# ports:
# - "5432:5432"
ports:
- "5432:5432"

proxy:
image: nginx:stable-alpine
Expand Down
9 changes: 7 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,18 @@ def register_toolkit_for_master_organisation():
AgentWorkflowSeed.build_task_based_agent(session)
AgentWorkflowSeed.build_fixed_task_based_agent(session)
AgentWorkflowSeed.build_sales_workflow(session)
AgentWorkflowSeed.build_recruitment_workflow(session)
# AgentWorkflowSeed.build_recruitment_workflow(session)
AgentWorkflowSeed.build_coding_workflow(session)
AgentWorkflowSeed.build_test_condition_workflow(session)
AgentWorkflowSeed.build_recruitment_workflow1(session)
AgentWorkflowSeed.build_recruitment_workflow5(session)


# NOTE: remove old workflows. Need to remove this changes later
workflows = ["Sales Engagement Workflow", "Recruitment Workflow", "SuperCoder", "Goal Based Workflow",
"Dynamic Task Workflow", "Fixed Task Workflow"]
"Dynamic Task Workflow", "Fixed Task Workflow","Test-Yaml-Workflow","Test-Yaml-Workflow-Sales", "Test Condition Workflow","Recruitment Workflow1","Recruitment Workflow5","Naman2"]
workflows = session.query(AgentWorkflow).filter(AgentWorkflow.name.not_in(workflows))
print("Workflows : ",workflows)
for workflow in workflows:
session.delete(workflow)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Added Columns in Agent Workflows

Revision ID: 80ea8fa19acb
Revises: 88585c347b4c
Create Date: 2023-10-06 11:46:44.881154

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '80ea8fa19acb'
down_revision = '88585c347b4c'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('agent_workflows', sa.Column('organisation_id', sa.Integer(), nullable=True))
op.add_column('agent_workflows', sa.Column('code_yaml', sa.Text(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('agent_workflows', 'code_yaml')
op.drop_column('agent_workflows', 'organisation_id')
# ### end Alembic commands ###
93 changes: 93 additions & 0 deletions migrations/versions/88585c347b4c_added_condition_workflow_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""added_condition_workflow_step

Revision ID: 88585c347b4c
Revises: c4f2f6ba602a
Create Date: 2023-09-20 05:24:14.759428

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '88585c347b4c'
down_revision = 'c4f2f6ba602a'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('agent_workflow_step_conditions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('instruction', sa.String(), nullable=True),
sa.Column('tool_output', sa.String(), nullable=True),
sa.Column('tool_name', sa.String(), nullable=True),
sa.Column('unique_id', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.drop_index('ix_agent_schedule_agent_id', table_name='agent_schedule')
op.drop_index('ix_agent_schedule_expiry_date', table_name='agent_schedule')
op.drop_index('ix_agent_schedule_status', table_name='agent_schedule')
op.alter_column('agent_workflow_steps', 'unique_id',
existing_type=sa.VARCHAR(),
nullable=True)
op.alter_column('agent_workflow_steps', 'step_type',
existing_type=sa.VARCHAR(),
nullable=True)
op.drop_column('agent_workflows', 'organisation_id')
op.drop_index('ix_events_agent_id', table_name='events')
op.drop_index('ix_events_event_property', table_name='events')
op.drop_index('ix_events_org_id', table_name='events')
op.alter_column('knowledge_configs', 'knowledge_id',
existing_type=sa.INTEGER(),
nullable=True)
op.alter_column('knowledges', 'name',
existing_type=sa.VARCHAR(),
nullable=True)
op.alter_column('vector_db_configs', 'vector_db_id',
existing_type=sa.INTEGER(),
nullable=True)
op.alter_column('vector_db_indices', 'name',
existing_type=sa.VARCHAR(),
nullable=True)
op.alter_column('vector_dbs', 'name',
existing_type=sa.VARCHAR(),
nullable=True)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('vector_dbs', 'name',
existing_type=sa.VARCHAR(),
nullable=False)
op.alter_column('vector_db_indices', 'name',
existing_type=sa.VARCHAR(),
nullable=False)
op.alter_column('vector_db_configs', 'vector_db_id',
existing_type=sa.INTEGER(),
nullable=False)
op.alter_column('knowledges', 'name',
existing_type=sa.VARCHAR(),
nullable=False)
op.alter_column('knowledge_configs', 'knowledge_id',
existing_type=sa.INTEGER(),
nullable=False)
op.create_index('ix_events_org_id', 'events', ['org_id'], unique=False)
op.create_index('ix_events_event_property', 'events', ['event_property'], unique=False)
op.create_index('ix_events_agent_id', 'events', ['agent_id'], unique=False)
op.add_column('agent_workflows', sa.Column('organisation_id', sa.INTEGER(), autoincrement=False, nullable=True))
op.alter_column('agent_workflow_steps', 'step_type',
existing_type=sa.VARCHAR(),
nullable=False)
op.alter_column('agent_workflow_steps', 'unique_id',
existing_type=sa.VARCHAR(),
nullable=False)
op.create_index('ix_agent_schedule_status', 'agent_schedule', ['status'], unique=False)
op.create_index('ix_agent_schedule_expiry_date', 'agent_schedule', ['expiry_date'], unique=False)
op.create_index('ix_agent_schedule_agent_id', 'agent_schedule', ['agent_id'], unique=False)
op.drop_table('agent_workflow_step_conditions')
# ### end Alembic commands ###
2 changes: 1 addition & 1 deletion superagi/agent/agent_prompt_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,4 @@ def replace_task_based_variables(cls, super_agi_prompt: str, current_task: str,
if token_count > min(600, pending_tokens):
break
super_agi_prompt = super_agi_prompt.replace("{task_history}", "\n" + final_output + "\n")
return super_agi_prompt
return super_agi_prompt
75 changes: 62 additions & 13 deletions superagi/agent/agent_tool_step_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from superagi.agent.output_parser import AgentSchemaToolOutputParser
from superagi.agent.queue_step_handler import QueueStepHandler
from superagi.agent.tool_builder import ToolBuilder
from superagi.agent.types.agent_workflow_step_action_types import AgentWorkflowStepAction
from superagi.agent.workflow.steps.condition_step import AgentConditionStepHandler
from superagi.helper.prompt_reader import PromptReader
from superagi.helper.token_counter import TokenCounter
from superagi.lib.logger import logger
Expand All @@ -19,13 +21,16 @@
from superagi.models.tool import Tool
from superagi.models.toolkit import Toolkit
from superagi.models.workflows.agent_workflow_step import AgentWorkflowStep
from superagi.models.workflows.agent_workflow_step_condition import AgentWorkflowStepCondition
from superagi.models.workflows.agent_workflow_step_tool import AgentWorkflowStepTool
from superagi.resource_manager.resource_summary import ResourceSummarizer
from superagi.tools.base_tool import BaseTool
from sqlalchemy import and_


class AgentToolStepHandler:
"""Handles the tools steps in the agent workflow"""

def __init__(self, session, llm, agent_id: int, agent_execution_id: int, memory=None):
self.session = session
self.llm = llm
Expand All @@ -45,28 +50,50 @@ def execute_step(self):

if not self._handle_wait_for_permission(execution, workflow_step):
return

# TODO : Handle task queue as LOOP step
if step_tool.tool_name == "TASK_QUEUE":
step_response = QueueStepHandler(self.session, self.llm, self.agent_id, self.agent_execution_id).execute_step()
print("______________LOOP STEP____________________")
step_response = QueueStepHandler(self.session, self.llm, self.agent_id,
self.agent_execution_id).execute_step()
next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, step_response)
self._handle_next_step(next_step)
return

# TODO : Handle WAIT_FOR_PERMISSION as an independent step
if step_tool.tool_name == "WAIT_FOR_PERMISSION":
print("________________WAIT FOR PERMISSION_____________")
self._create_permission_request(execution, step_tool)
return

print("____________AGENT_TOOL_STEP_HANDLER_____________")
print(step_tool.tool_name)
print(workflow_step)
assistant_reply = self._process_input_instruction(agent_config, agent_execution_config, step_tool,
workflow_step)
print("_____________ASSISTANT_REPLY_____________")
print(assistant_reply)
tool_obj = self._build_tool_obj(agent_config, agent_execution_config, step_tool.tool_name)
tool_output_handler = ToolOutputHandler(self.agent_execution_id, agent_config, [tool_obj],self.memory,
print("_____________TOOL_OBJ_____________")
print(tool_obj)
# INFO: Tool output handler is get the verdict or result from the tool
tool_output_handler = ToolOutputHandler(self.agent_execution_id, agent_config, [tool_obj], self.memory,
output_parser=AgentSchemaToolOutputParser())
print("_____________TOOL_OUTPUT_HANDLER_____________")
print(tool_output_handler)
final_response = tool_output_handler.handle(self.session, assistant_reply)
print("_____________FINAL_RESPONSE_____________")
print(final_response)
step_response = "default"
# TODO: Here output instruction is processed which needs to be handled as CONDITION step
# Following is kept as a backward support for the old workflows
if step_tool.output_instruction:
step_response = self._process_output_instruction(final_response.result, step_tool, workflow_step)

next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, step_response)
next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id,
step_response)
print("TOOL HANDLER - Next Step : ",next_step)
if next_step is not "COMPLETE":
(AgentConditionStepHandler(self.session, self.agent_id, self.agent_execution_id, self.llm)
.update_tool_output(tool_output=final_response.result, next_step=next_step,tool_name=step_tool.tool_name))
print("Calling handler ___________")
self._handle_next_step(next_step)
self.session.flush()

Expand All @@ -86,6 +113,8 @@ def _create_permission_request(self, execution, step_tool: AgentWorkflowStepTool
self.session.commit()

def _handle_next_step(self, next_step):
print("_____________HANDLE_NEXT_STEP_____________")
print("next_step : ", next_step)
if str(next_step) == "COMPLETE":
agent_execution = AgentExecution.get_agent_execution_from_id(self.session, self.agent_execution_id)
agent_execution.current_agent_step_id = -1
Expand All @@ -95,16 +124,22 @@ def _handle_next_step(self, next_step):
self.session.commit()

def _process_input_instruction(self, agent_config, agent_execution_config, step_tool, workflow_step):
print("_____________PROCESS_INPUT_INSTRUCTION_____________")
tool_obj = self._build_tool_obj(agent_config, agent_execution_config, step_tool.tool_name)
print("_____________TOOL_OBJ_____________")
print(tool_obj)
prompt = self._build_tool_input_prompt(step_tool, tool_obj, agent_execution_config)
logger.info("Prompt: ", prompt)
agent_feeds = AgentExecutionFeed.fetch_agent_execution_feeds(self.session, self.agent_execution_id)
messages = AgentLlmMessageBuilder(self.session, self.llm, self.llm.get_model(), self.agent_id, self.agent_execution_id) \
messages = AgentLlmMessageBuilder(self.session, self.llm, self.llm.get_model(), self.agent_id,
self.agent_execution_id) \
.build_agent_messages(prompt, agent_feeds, history_enabled=step_tool.history_enabled,
completion_prompt=step_tool.completion_prompt)
# print(messages)
current_tokens = TokenCounter.count_message_tokens(messages, self.llm.get_model())
response = self.llm.chat_completion(messages, TokenCounter(session=self.session, organisation_id=self.organisation.id).token_limit(self.llm.get_model()) - current_tokens)
response = self.llm.chat_completion(messages, TokenCounter(session=self.session,
organisation_id=self.organisation.id).token_limit(
self.llm.get_model()) - current_tokens)
# ModelsHelper(session=self.session, organisation_id=organisation.id).create_call_log(execution.name,agent_config['agent_id'],response['response'].usage.total_tokens,json.loads(response['content'])['tool']['name'],agent_config['model'])
if 'content' not in response or response['content'] is None:
raise RuntimeError(f"Failed to get response from llm")
Expand All @@ -114,8 +149,11 @@ def _process_input_instruction(self, agent_config, agent_execution_config, step_
return assistant_reply

def _build_tool_obj(self, agent_config, agent_execution_config, tool_name: str):
model_api_key = AgentConfiguration.get_model_api_key(self.session, self.agent_id, agent_config["model"])['api_key']
model_api_key = AgentConfiguration.get_model_api_key(self.session, self.agent_id, agent_config["model"])[
'api_key']
print("model_api_key : ", model_api_key)
tool_builder = ToolBuilder(self.session, self.agent_id, self.agent_execution_id)
print("tool builder : ", tool_builder)
resource_summary = ""
if tool_name == "QueryResourceTool":
resource_summary = ResourceSummarizer(session=self.session,
Expand All @@ -124,10 +162,20 @@ def _build_tool_obj(self, agent_config, agent_execution_config, tool_name: str):
default_summary=agent_config.get("resource_summary"))

organisation = Agent.find_org_by_agent_id(self.session, self.agent_id)
tool = self.session.query(Tool).join(Toolkit, and_(Tool.toolkit_id == Toolkit.id, Toolkit.organisation_id == organisation.id, Tool.name == tool_name)).first()
print("organisation : ", organisation)
print("tool_name : ", tool_name)
print("Toolkit : ", Toolkit.id)
tool = self.session.query(Tool).join(Toolkit, and_(Tool.toolkit_id == Toolkit.id,
Toolkit.organisation_id == organisation.id,
Tool.name == tool_name)).first()
# tool = self.session.query(Tool).filter( Toolkit.organisation_id == organisation.id, Tool.name == tool_name).first()

print("Tool : ", tool)
tool_obj = tool_builder.build_tool(tool)
print("Tool Obj1 : ", tool_obj)
tool_obj = tool_builder.set_default_params_tool(tool_obj, agent_config, agent_execution_config, model_api_key,
resource_summary,self.memory)
resource_summary, self.memory)
print("Tool Obj2 : ", tool_obj)
return tool_obj

def _process_output_instruction(self, final_response: str, step_tool: AgentWorkflowStepTool,
Expand All @@ -136,7 +184,9 @@ def _process_output_instruction(self, final_response: str, step_tool: AgentWorkf
messages = [{"role": "system", "content": prompt}]
current_tokens = TokenCounter.count_message_tokens(messages, self.llm.get_model())
response = self.llm.chat_completion(messages,
TokenCounter(session=self.session, organisation_id=self.organisation.id).token_limit(self.llm.get_model()) - current_tokens)
TokenCounter(session=self.session,
organisation_id=self.organisation.id).token_limit(
self.llm.get_model()) - current_tokens)
if 'content' not in response or response['content'] is None:
raise RuntimeError(f"ToolWorkflowStepHandler: Failed to get output response from llm")
total_tokens = current_tokens + TokenCounter.count_message_tokens(response, self.llm.get_model())
Expand Down Expand Up @@ -196,7 +246,6 @@ def _handle_wait_for_permission(self, agent_execution, workflow_step: AgentWorkf
next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, "NO")
result = f"{' User has given the following feedback : ' + agent_execution_permission.user_feedback if agent_execution_permission.user_feedback else ''}"


agent_execution_feed = AgentExecutionFeed(agent_execution_id=agent_execution_permission.agent_execution_id,
agent_id=agent_execution_permission.agent_id,
feed=result, role="user",
Expand Down
Loading