diff --git a/docker-compose.yaml b/docker-compose.yaml index 94044916b..6101289a9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -37,8 +37,8 @@ services: networks: - super_network # uncomment to expose redis port to host -# ports: -# - "6379:6379" + ports: + - "6379:6379" volumes: - redis_data:/data @@ -53,8 +53,8 @@ services: networks: - super_network # uncomment to expose postgres port to host -# ports: -# - "5432:5432" + ports: + - "5432:5432" proxy: image: nginx:stable-alpine diff --git a/main.py b/main.py index a0cd36223..969319712 100644 --- a/main.py +++ b/main.py @@ -225,13 +225,18 @@ def register_toolkit_for_master_organisation(): AgentWorkflowSeed.build_task_based_agent(session) AgentWorkflowSeed.build_fixed_task_based_agent(session) AgentWorkflowSeed.build_sales_workflow(session) - AgentWorkflowSeed.build_recruitment_workflow(session) + # AgentWorkflowSeed.build_recruitment_workflow(session) AgentWorkflowSeed.build_coding_workflow(session) + AgentWorkflowSeed.build_test_condition_workflow(session) + AgentWorkflowSeed.build_recruitment_workflow1(session) + AgentWorkflowSeed.build_recruitment_workflow5(session) + # NOTE: remove old workflows. Need to remove this changes later workflows = ["Sales Engagement Workflow", "Recruitment Workflow", "SuperCoder", "Goal Based Workflow", - "Dynamic Task Workflow", "Fixed Task Workflow"] + "Dynamic Task Workflow", "Fixed Task Workflow","Test-Yaml-Workflow","Test-Yaml-Workflow-Sales", "Test Condition Workflow","Recruitment Workflow1","Recruitment Workflow5","Naman2"] workflows = session.query(AgentWorkflow).filter(AgentWorkflow.name.not_in(workflows)) + print("Workflows : ",workflows) for workflow in workflows: session.delete(workflow) diff --git a/migrations/versions/80ea8fa19acb_added_columns_in_agent_workflows.py b/migrations/versions/80ea8fa19acb_added_columns_in_agent_workflows.py new file mode 100644 index 000000000..b7cff4d45 --- /dev/null +++ b/migrations/versions/80ea8fa19acb_added_columns_in_agent_workflows.py @@ -0,0 +1,30 @@ +"""Added Columns in Agent Workflows + +Revision ID: 80ea8fa19acb +Revises: 88585c347b4c +Create Date: 2023-10-06 11:46:44.881154 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '80ea8fa19acb' +down_revision = '88585c347b4c' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('agent_workflows', sa.Column('organisation_id', sa.Integer(), nullable=True)) + op.add_column('agent_workflows', sa.Column('code_yaml', sa.Text(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('agent_workflows', 'code_yaml') + op.drop_column('agent_workflows', 'organisation_id') + # ### end Alembic commands ### diff --git a/migrations/versions/88585c347b4c_added_condition_workflow_step.py b/migrations/versions/88585c347b4c_added_condition_workflow_step.py new file mode 100644 index 000000000..dd7e513f5 --- /dev/null +++ b/migrations/versions/88585c347b4c_added_condition_workflow_step.py @@ -0,0 +1,93 @@ +"""added_condition_workflow_step + +Revision ID: 88585c347b4c +Revises: c4f2f6ba602a +Create Date: 2023-09-20 05:24:14.759428 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '88585c347b4c' +down_revision = 'c4f2f6ba602a' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('agent_workflow_step_conditions', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('instruction', sa.String(), nullable=True), + sa.Column('tool_output', sa.String(), nullable=True), + sa.Column('tool_name', sa.String(), nullable=True), + sa.Column('unique_id', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.drop_index('ix_agent_schedule_agent_id', table_name='agent_schedule') + op.drop_index('ix_agent_schedule_expiry_date', table_name='agent_schedule') + op.drop_index('ix_agent_schedule_status', table_name='agent_schedule') + op.alter_column('agent_workflow_steps', 'unique_id', + existing_type=sa.VARCHAR(), + nullable=True) + op.alter_column('agent_workflow_steps', 'step_type', + existing_type=sa.VARCHAR(), + nullable=True) + op.drop_column('agent_workflows', 'organisation_id') + op.drop_index('ix_events_agent_id', table_name='events') + op.drop_index('ix_events_event_property', table_name='events') + op.drop_index('ix_events_org_id', table_name='events') + op.alter_column('knowledge_configs', 'knowledge_id', + existing_type=sa.INTEGER(), + nullable=True) + op.alter_column('knowledges', 'name', + existing_type=sa.VARCHAR(), + nullable=True) + op.alter_column('vector_db_configs', 'vector_db_id', + existing_type=sa.INTEGER(), + nullable=True) + op.alter_column('vector_db_indices', 'name', + existing_type=sa.VARCHAR(), + nullable=True) + op.alter_column('vector_dbs', 'name', + existing_type=sa.VARCHAR(), + nullable=True) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('vector_dbs', 'name', + existing_type=sa.VARCHAR(), + nullable=False) + op.alter_column('vector_db_indices', 'name', + existing_type=sa.VARCHAR(), + nullable=False) + op.alter_column('vector_db_configs', 'vector_db_id', + existing_type=sa.INTEGER(), + nullable=False) + op.alter_column('knowledges', 'name', + existing_type=sa.VARCHAR(), + nullable=False) + op.alter_column('knowledge_configs', 'knowledge_id', + existing_type=sa.INTEGER(), + nullable=False) + op.create_index('ix_events_org_id', 'events', ['org_id'], unique=False) + op.create_index('ix_events_event_property', 'events', ['event_property'], unique=False) + op.create_index('ix_events_agent_id', 'events', ['agent_id'], unique=False) + op.add_column('agent_workflows', sa.Column('organisation_id', sa.INTEGER(), autoincrement=False, nullable=True)) + op.alter_column('agent_workflow_steps', 'step_type', + existing_type=sa.VARCHAR(), + nullable=False) + op.alter_column('agent_workflow_steps', 'unique_id', + existing_type=sa.VARCHAR(), + nullable=False) + op.create_index('ix_agent_schedule_status', 'agent_schedule', ['status'], unique=False) + op.create_index('ix_agent_schedule_expiry_date', 'agent_schedule', ['expiry_date'], unique=False) + op.create_index('ix_agent_schedule_agent_id', 'agent_schedule', ['agent_id'], unique=False) + op.drop_table('agent_workflow_step_conditions') + # ### end Alembic commands ### diff --git a/superagi/agent/agent_prompt_builder.py b/superagi/agent/agent_prompt_builder.py index 4b9bce554..5a3da6286 100644 --- a/superagi/agent/agent_prompt_builder.py +++ b/superagi/agent/agent_prompt_builder.py @@ -133,4 +133,4 @@ def replace_task_based_variables(cls, super_agi_prompt: str, current_task: str, if token_count > min(600, pending_tokens): break super_agi_prompt = super_agi_prompt.replace("{task_history}", "\n" + final_output + "\n") - return super_agi_prompt + return super_agi_prompt \ No newline at end of file diff --git a/superagi/agent/agent_tool_step_handler.py b/superagi/agent/agent_tool_step_handler.py index 7aeb0d59b..5c7afe38a 100644 --- a/superagi/agent/agent_tool_step_handler.py +++ b/superagi/agent/agent_tool_step_handler.py @@ -7,6 +7,8 @@ from superagi.agent.output_parser import AgentSchemaToolOutputParser from superagi.agent.queue_step_handler import QueueStepHandler from superagi.agent.tool_builder import ToolBuilder +from superagi.agent.types.agent_workflow_step_action_types import AgentWorkflowStepAction +from superagi.agent.workflow.steps.condition_step import AgentConditionStepHandler from superagi.helper.prompt_reader import PromptReader from superagi.helper.token_counter import TokenCounter from superagi.lib.logger import logger @@ -19,13 +21,16 @@ from superagi.models.tool import Tool from superagi.models.toolkit import Toolkit from superagi.models.workflows.agent_workflow_step import AgentWorkflowStep +from superagi.models.workflows.agent_workflow_step_condition import AgentWorkflowStepCondition from superagi.models.workflows.agent_workflow_step_tool import AgentWorkflowStepTool from superagi.resource_manager.resource_summary import ResourceSummarizer from superagi.tools.base_tool import BaseTool from sqlalchemy import and_ + class AgentToolStepHandler: """Handles the tools steps in the agent workflow""" + def __init__(self, session, llm, agent_id: int, agent_execution_id: int, memory=None): self.session = session self.llm = llm @@ -45,28 +50,50 @@ def execute_step(self): if not self._handle_wait_for_permission(execution, workflow_step): return - + # TODO : Handle task queue as LOOP step if step_tool.tool_name == "TASK_QUEUE": - step_response = QueueStepHandler(self.session, self.llm, self.agent_id, self.agent_execution_id).execute_step() + print("______________LOOP STEP____________________") + step_response = QueueStepHandler(self.session, self.llm, self.agent_id, + self.agent_execution_id).execute_step() next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, step_response) self._handle_next_step(next_step) return - + # TODO : Handle WAIT_FOR_PERMISSION as an independent step if step_tool.tool_name == "WAIT_FOR_PERMISSION": + print("________________WAIT FOR PERMISSION_____________") self._create_permission_request(execution, step_tool) return - + print("____________AGENT_TOOL_STEP_HANDLER_____________") + print(step_tool.tool_name) + print(workflow_step) assistant_reply = self._process_input_instruction(agent_config, agent_execution_config, step_tool, workflow_step) + print("_____________ASSISTANT_REPLY_____________") + print(assistant_reply) tool_obj = self._build_tool_obj(agent_config, agent_execution_config, step_tool.tool_name) - tool_output_handler = ToolOutputHandler(self.agent_execution_id, agent_config, [tool_obj],self.memory, + print("_____________TOOL_OBJ_____________") + print(tool_obj) + # INFO: Tool output handler is get the verdict or result from the tool + tool_output_handler = ToolOutputHandler(self.agent_execution_id, agent_config, [tool_obj], self.memory, output_parser=AgentSchemaToolOutputParser()) + print("_____________TOOL_OUTPUT_HANDLER_____________") + print(tool_output_handler) final_response = tool_output_handler.handle(self.session, assistant_reply) + print("_____________FINAL_RESPONSE_____________") + print(final_response) step_response = "default" + # TODO: Here output instruction is processed which needs to be handled as CONDITION step + # Following is kept as a backward support for the old workflows if step_tool.output_instruction: step_response = self._process_output_instruction(final_response.result, step_tool, workflow_step) - next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, step_response) + next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, + step_response) + print("TOOL HANDLER - Next Step : ",next_step) + if next_step is not "COMPLETE": + (AgentConditionStepHandler(self.session, self.agent_id, self.agent_execution_id, self.llm) + .update_tool_output(tool_output=final_response.result, next_step=next_step,tool_name=step_tool.tool_name)) + print("Calling handler ___________") self._handle_next_step(next_step) self.session.flush() @@ -86,6 +113,8 @@ def _create_permission_request(self, execution, step_tool: AgentWorkflowStepTool self.session.commit() def _handle_next_step(self, next_step): + print("_____________HANDLE_NEXT_STEP_____________") + print("next_step : ", next_step) if str(next_step) == "COMPLETE": agent_execution = AgentExecution.get_agent_execution_from_id(self.session, self.agent_execution_id) agent_execution.current_agent_step_id = -1 @@ -95,16 +124,22 @@ def _handle_next_step(self, next_step): self.session.commit() def _process_input_instruction(self, agent_config, agent_execution_config, step_tool, workflow_step): + print("_____________PROCESS_INPUT_INSTRUCTION_____________") tool_obj = self._build_tool_obj(agent_config, agent_execution_config, step_tool.tool_name) + print("_____________TOOL_OBJ_____________") + print(tool_obj) prompt = self._build_tool_input_prompt(step_tool, tool_obj, agent_execution_config) logger.info("Prompt: ", prompt) agent_feeds = AgentExecutionFeed.fetch_agent_execution_feeds(self.session, self.agent_execution_id) - messages = AgentLlmMessageBuilder(self.session, self.llm, self.llm.get_model(), self.agent_id, self.agent_execution_id) \ + messages = AgentLlmMessageBuilder(self.session, self.llm, self.llm.get_model(), self.agent_id, + self.agent_execution_id) \ .build_agent_messages(prompt, agent_feeds, history_enabled=step_tool.history_enabled, completion_prompt=step_tool.completion_prompt) # print(messages) current_tokens = TokenCounter.count_message_tokens(messages, self.llm.get_model()) - response = self.llm.chat_completion(messages, TokenCounter(session=self.session, organisation_id=self.organisation.id).token_limit(self.llm.get_model()) - current_tokens) + response = self.llm.chat_completion(messages, TokenCounter(session=self.session, + organisation_id=self.organisation.id).token_limit( + self.llm.get_model()) - current_tokens) # ModelsHelper(session=self.session, organisation_id=organisation.id).create_call_log(execution.name,agent_config['agent_id'],response['response'].usage.total_tokens,json.loads(response['content'])['tool']['name'],agent_config['model']) if 'content' not in response or response['content'] is None: raise RuntimeError(f"Failed to get response from llm") @@ -114,8 +149,11 @@ def _process_input_instruction(self, agent_config, agent_execution_config, step_ return assistant_reply def _build_tool_obj(self, agent_config, agent_execution_config, tool_name: str): - model_api_key = AgentConfiguration.get_model_api_key(self.session, self.agent_id, agent_config["model"])['api_key'] + model_api_key = AgentConfiguration.get_model_api_key(self.session, self.agent_id, agent_config["model"])[ + 'api_key'] + print("model_api_key : ", model_api_key) tool_builder = ToolBuilder(self.session, self.agent_id, self.agent_execution_id) + print("tool builder : ", tool_builder) resource_summary = "" if tool_name == "QueryResourceTool": resource_summary = ResourceSummarizer(session=self.session, @@ -124,10 +162,20 @@ def _build_tool_obj(self, agent_config, agent_execution_config, tool_name: str): default_summary=agent_config.get("resource_summary")) organisation = Agent.find_org_by_agent_id(self.session, self.agent_id) - tool = self.session.query(Tool).join(Toolkit, and_(Tool.toolkit_id == Toolkit.id, Toolkit.organisation_id == organisation.id, Tool.name == tool_name)).first() + print("organisation : ", organisation) + print("tool_name : ", tool_name) + print("Toolkit : ", Toolkit.id) + tool = self.session.query(Tool).join(Toolkit, and_(Tool.toolkit_id == Toolkit.id, + Toolkit.organisation_id == organisation.id, + Tool.name == tool_name)).first() + # tool = self.session.query(Tool).filter( Toolkit.organisation_id == organisation.id, Tool.name == tool_name).first() + + print("Tool : ", tool) tool_obj = tool_builder.build_tool(tool) + print("Tool Obj1 : ", tool_obj) tool_obj = tool_builder.set_default_params_tool(tool_obj, agent_config, agent_execution_config, model_api_key, - resource_summary,self.memory) + resource_summary, self.memory) + print("Tool Obj2 : ", tool_obj) return tool_obj def _process_output_instruction(self, final_response: str, step_tool: AgentWorkflowStepTool, @@ -136,7 +184,9 @@ def _process_output_instruction(self, final_response: str, step_tool: AgentWorkf messages = [{"role": "system", "content": prompt}] current_tokens = TokenCounter.count_message_tokens(messages, self.llm.get_model()) response = self.llm.chat_completion(messages, - TokenCounter(session=self.session, organisation_id=self.organisation.id).token_limit(self.llm.get_model()) - current_tokens) + TokenCounter(session=self.session, + organisation_id=self.organisation.id).token_limit( + self.llm.get_model()) - current_tokens) if 'content' not in response or response['content'] is None: raise RuntimeError(f"ToolWorkflowStepHandler: Failed to get output response from llm") total_tokens = current_tokens + TokenCounter.count_message_tokens(response, self.llm.get_model()) @@ -196,7 +246,6 @@ def _handle_wait_for_permission(self, agent_execution, workflow_step: AgentWorkf next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, "NO") result = f"{' User has given the following feedback : ' + agent_execution_permission.user_feedback if agent_execution_permission.user_feedback else ''}" - agent_execution_feed = AgentExecutionFeed(agent_execution_id=agent_execution_permission.agent_execution_id, agent_id=agent_execution_permission.agent_id, feed=result, role="user", diff --git a/superagi/agent/agent_workflow.yaml b/superagi/agent/agent_workflow.yaml new file mode 100644 index 000000000..72daf1948 --- /dev/null +++ b/superagi/agent/agent_workflow.yaml @@ -0,0 +1,141 @@ +steps: + - name: "step1" # can be any name not necessarily step1 but first will be considered as TRIGGER internally + trigger_step: true # For triggering the workflow + type: "TOOL" # TOOL BLOCK + tool: "List File" # Will be available as completion or drop down in frontend for all tool types + instruction: "Read files from the resource manager" + next: "step2" # For linking to next flow step + + - name: "step2" + type: "LOOP" # LOOP BLOCK + next: # For conditional branching + next_step: "step3" + exit_step: "step8" + + - name: "step3" + type: "TOOL" # TOOL BLOCK + tool: "Read File" + instruction: "Read the resume from above input" + next: "step4" + + - name: "step4" + type: "TOOL" # TOOL BLOCK + tool: "Read File" + instruction: "Read the job description from file mentioned in High-Level GOAL" + next: "step6" + + - name: "step5" + type: "CONDITION" # CONDITION BLOCK + instruction: "Check if the resume matches the job description in goal" # Can be optional + next: + - output: "NO" # Can be any conditional string not necessarily YES/NO + step: "step2" + - output: "YES" + step: "step6" + + - name: "step6" + type: "WAIT_FOR_PERMISSION" # PERMISSION BLOCK + instruction: "Check if the resume matches the job description in goal" + next: + - output: "NO" + step: "step2" + - output: "YES" + step: "step7" + + + - name: "step7" + type: "TOOL" # TOOL BLOCK + tool: "Send Email" + instruction: "Write a custom Email the candidates for job profile based on their experience" + next: "step2" + + - name: "step8" + type: "WAIT" # WAIT BLOCK + duration: 120 # In seconds + instruction: "Waiting before writing a final ending note." + next: "step9" + + - name: "step9" + type: "TOOL" # TOOL BLOCK + tool: "Write File" + instruction: "Write an ending note of work done" + terminal_step: true + +# Exceptions, validation and assumptions +# ------------------------- +# All steps except the loop end and terminal step must have next +# tool step : if it is a tool step it must have tool, instruction and next +# loop step : if it is a loop step it must have next containing next_step and exit_step +# if exit step is not provided it will be considered as terminal step and workflow will terminate once loop is completed +# condition step : if it is a condition step it must have next containing output and step +# if output is not provided it will be considered as terminal step and workflow will terminate once condition is completed +# wait step: if it is a wait step it must have duration and next +# if next is not provided it will be considered as terminal step and workflow will terminate once wait is completed +# wait for permission step: if it is a wait for permission step it must have next, output can be only YES and NO as action console just support them + +##Sales Engagement Workflow +#steps: +# - name: "step1" # can be any name not necessarily step1 but first will be considered as TRIGGER internally +# trigger_step: true # For triggering the workflow +# type: "TOOL" # TOOL BLOCK +# tool: "List File" # Will be available as completion or drop down in frontend for all tool types +# instruction: "list the files" +# next: "step2" # For linking to next flow step +# +# - name: "step2" +# type: "TOOL" # TOOL BLOCK +# tool: "Read File" +# instruction: "Read the leads from the file" +# next: "step3" +# +# - name: "step3" +# type: "LOOP" # LOOP BLOCK +# next: # For conditional branching +# next_step: "step4" +# exit_step: "step10" +# +# - name: "step4" +# type: "TOOL" # TOOL BLOCK +# tool: "SearxSearch" +# instruction: "Search about the company in which the lead is working" +# next: "step5" +# +# - name: "step5" +# type: "WAIT_FOR_PERMISSION" +# instruction: "Email will be based on this content. Do you want send the email" +# next: +# - output: "NO" +# step: "step4" +# - output: "YES" +# step: "step6" +# +# - name: "step6" +# type: "TOOL" +# tool: "SearxSearch" +# instruction: "Search about the company given in the high-end goal only" +# next: "step7" +# +# - name: "step7" +# type: "TOOL" +# tool: "Send Email" +# instruction: "Customize the Email according to the company information in the mail" +# next: "step8" +# +# - name: "step8" +# type: "WAIT" # WAIT BLOCK +# duration: 120 # In seconds +# instruction: "Wait for 2 minutes" +# next: "step9" +# +# - name: "step9" +# type: "TOOL" +# tool: "Read Email" +# instruction: "Read the email from adarshdeepmurari@gmail.com" +# next: "step3" +# +# - name: "step10" +# type: "TOOL" +# tool: "Write File" +# instruction: "Write a summary about the work done so far in a file named workflow_summary.txt" + +#Recruitment Workflow diff --git a/superagi/agent/agent_workflow_builder.py b/superagi/agent/agent_workflow_builder.py new file mode 100644 index 000000000..701a2e8d7 --- /dev/null +++ b/superagi/agent/agent_workflow_builder.py @@ -0,0 +1,383 @@ +import queue +from urllib.parse import urlparse + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from superagi.config.config import get_config +from superagi.models.db import connect_db +from superagi.models.workflows.agent_workflow import AgentWorkflow +from superagi.models.workflows.agent_workflow_step import AgentWorkflowStep +from superagi.agent.agent_workflow_validator import AgentWorkflowValidator + + +class AgentWorkflowBuilder: + """AgentWorkflowBuilder class is responsible for building agent workflows from .yaml based workflows.""" + + def __init__(self, session, agent_workflow): + self.session = session + self.agent_workflow = agent_workflow + + def build_workflow_from_yaml(self, workflow_yaml): + """Build agent workflow from .yaml file content. + + Args: + workflow_yaml (str): The workflow content in .yaml format. + + Returns: + AgentWorkflow: The agent workflow. + """ + print("BUILDING WORKFLOW FROM YAML") + # get parsed workflow yaml as dict + agent_workflow = workflow_yaml + + # AgentWorkflowValidator(self.session).validate_workflow_steps(agent_workflow) + + trigger_workflow_yaml_step = self.find_trigger_step(agent_workflow) + if trigger_workflow_yaml_step is None: + raise Exception("Trigger step not found in workflow") + step_queue = queue.Queue() + step_queue.put({ + "step_to_be_processed": trigger_workflow_yaml_step, + "previous_workflow_step": None, + "previous_workflow_step_output": None + }) + + current_loop_step = None + visited_steps = set() + # visited_steps.add(trigger_workflow_yaml_step["name"]) + counter = 0 + while not step_queue.empty(): + counter = counter + 1 + print(f"___________________________Queue not empty : {counter}___________________________") + print(step_queue.queue) + print("size: ", step_queue.qsize()) + current_step = step_queue.get() + # if current_step.get("step_to_be_processed").get("name") in visited_steps: + # continue + print("Current Step :", current_step) + print(f"Processing step: {current_step.get('step_to_be_processed').get('name')}") + # Write logic to build Workflow step + workflow_step_obj = self.build_step(current_step.get("step_to_be_processed")) + print("Built Workflow Step : ", workflow_step_obj) + # nested loop check + self.check_nested_loop(current_loop_step, current_step, workflow_step_obj) + + # Connecting workflow steps + print("________________Connecting workflow steps") + print("Current Step : ", current_step) + print("Workflow Step : ", workflow_step_obj) + + self.connect_workflow_step(current_step, workflow_step_obj) + + # Adding steps to the queue + print("Visited Steps : ", visited_steps) + if current_step.get("step_to_be_processed").get("name") not in visited_steps: + print("_____________Adding to queue") + print("Current Step : ", current_step.get("step_to_be_processed")) + visited_steps.add(current_step.get("step_to_be_processed").get("name")) + print(current_step.get("step_to_be_processed").get("next")) + if current_step.get("step_to_be_processed").get("next") is not None: + self.add_next_steps_to_queue(agent_workflow, current_step, step_queue, workflow_step_obj) + else: + print("No next step") + AgentWorkflowStep.add_next_workflow_step(self.session, + workflow_step_obj.id, + -1, "default" if current_step.get("step_to_be_processed") + .get("type") == "LOOP" else "COMPLETE") + + workflow_steps = session.query(AgentWorkflowStep).filter(AgentWorkflowStep.agent_workflow_id == self.agent_workflow.id).all() + sorted_workflow_steps = sorted(workflow_steps, key=lambda x: x.id) + print("_______________________Workflow Steps : ") + for step in sorted_workflow_steps: + print(step) + + def connect_workflow_step(self, current_step, workflow_step): + if current_step.get("previous_workflow_step") is not None: + print(f"Connecting {current_step.get('previous_workflow_step')} to {workflow_step}") + resp = AgentWorkflowStep.add_next_workflow_step(self.session, current_step.get("previous_workflow_step").id, + workflow_step.id, + current_step.get("previous_workflow_step_output")) + print("Response : ", resp) + def check_nested_loop(self, current_loop_step, current_step, workflow_step): + if current_step.get("step_to_be_processed").get("type") == "LOOP": + if current_loop_step is not None and current_loop_step != workflow_step: + raise Exception("Nested loop not allowed") + else: + current_loop_step = workflow_step + + def add_next_steps_to_queue(self, agent_workflow, current_step, step_queue, workflow_step): + print("Adding next steps to queue") + next_steps = current_step.get("step_to_be_processed").get("next") + print("Next Step : ", next_steps) + branched_step_types = ["CONDITION", "WAIT_FOR_PERMISSION", "LOOP"] + if current_step.get("step_to_be_processed").get("type") in branched_step_types: + # handling loop steps next step + print('handling loop steps next step') + print(current_step.get("step_to_be_processed").get("type")) + if current_step.get("step_to_be_processed").get("type") == "LOOP": + print("Loop step") + if next_steps.get("next_step") is not None: + step_queue.put({ + "step_to_be_processed": self.get_step_by_name(agent_workflow, next_steps.get("next_step")), + "previous_workflow_step": workflow_step, + "previous_workflow_step_output": "default" + }) + else: + raise Exception("Loop step must have next") + # handling exit step + exit_loop_step = next_steps.get("exit_step") if next_steps.get("exit_step") else -1 + step_queue.put({ + "step_to_be_processed": self.get_step_by_name(agent_workflow, exit_loop_step), + "previous_workflow_step": workflow_step, + "previous_workflow_step_output": "COMPLETE" + }) + else: + # Handling branched steps i.e. condition, wait for permission + for next_step_info in next_steps: + print("---------------bdjbjd-----------", next_step_info) + next_step_name = next_step_info.get("step") + next_step = self.get_step_by_name(agent_workflow, next_step_name) + if next_step: + step_queue.put({ + "step_to_be_processed": next_step, + "previous_workflow_step": workflow_step, + "previous_workflow_step_output": next_step_info.get("output") + }) + else: + # Handling next linear step i.e. tool step + step_queue.put({ + "step_to_be_processed": self.get_step_by_name(agent_workflow, next_steps), + "previous_workflow_step": workflow_step, + "previous_workflow_step_output": "default" + }) + + # def parse_workflow_yaml(self, workflow_yaml): + # """Parse the workflow yaml content. + # + # Args: + # workflow_yaml (str): The workflow content in .yaml format. + # + # Returns: + # dict: The parsed workflow yaml content. + # """ + # + # workflow = [] + # + # for step_data in workflow_yaml: + # step = { + # "name": step_data["name"], + # "type": step_data["type"] + # } + # + # if "trigger_step" in step_data: + # step["trigger_step"] = step_data["trigger_step"] + # + # if "tool" in step_data: + # step["tool"] = step_data["tool"] + # step["instruction"] = step_data["instruction"] + # + # if "next" in step_data: + # next_steps = [] + # + # if "next_step" in step_data["next"]: + # next_steps.append({"output": "next_step", "step": step_data["next"]["next_step"]}) + # + # if "exit_step" in step_data["next"]: + # next_steps.append({"output": "exit_step", "step": step_data["next"]["exit_step"]}) + # + # step["next"] = next_steps + # + # workflow.append(step) + # + # return workflow + + def find_trigger_step(self, workflow): + """Find the trigger step in the workflow.""" + for step in workflow: + if step.get("trigger_step"): + return step + raise Exception("Trigger step not found in workflow") + + def get_step_by_name(self, workflow, step_name): + """Get the next step for the given step name.""" + print('get_step_by_name ', step_name) + for step in workflow: + if step["name"] == step_name: + return step + raise Exception(f"Step not found: {step_name}") + + def build_step(self, step): + """Build agent workflow step.""" + # TODO: Add support for iteration workflow step, conditional workflow step + + agent_workflow_step = None + if step["type"] == "TOOL": + agent_workflow_step = AgentWorkflowStep.find_or_create_tool_workflow_step(session=self.session, + agent_workflow_id=self.agent_workflow.id, + unique_id=str(self.agent_workflow.id) + "_" + + step["name"], + tool_name=step["tool"], + input_instruction=step["instruction"]) + elif step["type"] == "LOOP": + agent_workflow_step = AgentWorkflowStep.find_or_create_tool_workflow_step(session=self.session, + agent_workflow_id=self.agent_workflow.id, + unique_id=str(self.agent_workflow.id) + "_" + + step["name"], + tool_name="TASK_QUEUE", + input_instruction="Break the above response array of items", + completion_prompt="Get array of items from the above response. Array should suitable utilization of JSON.parse().") + elif step["type"] == "WAIT_FOR_PERMISSION": + agent_workflow_step = AgentWorkflowStep.find_or_create_tool_workflow_step(session=self.session, + agent_workflow_id=self.agent_workflow.id, + unique_id=str(self.agent_workflow.id) + "_" + + step["name"], + tool_name="WAIT_FOR_PERMISSION", + input_instruction=step["instruction"]) + elif step["type"] == "WAIT": + agent_workflow_step = AgentWorkflowStep.find_or_create_wait_workflow_step(session=self.session, + agent_workflow_id=self.agent_workflow.id, + unique_id=str(self.agent_workflow.id) + "_" + + step["name"], + delay=step["duration"], + wait_description=step["instruction"]) + elif step["type"] == "CONDITION": + agent_workflow_step = AgentWorkflowStep.find_or_create_condition_workflow_step(session=self.session, + agent_workflow_id=self.agent_workflow.id, + unique_id=str(self.agent_workflow.id) + "_" + + step["name"], + instruction=step["instruction"]) + + + + if step.get("trigger_step") is not None and step["trigger_step"] is True: + agent_workflow_step.step_type = "TRIGGER" + + return agent_workflow_step + + +# def parse_workflow_yaml(workflow_yaml): +# """Parse the workflow yaml content. +# +# Args: +# workflow_yaml (str): The workflow content in .yaml format. +# +# Returns: +# dict: The parsed workflow yaml content. +# """ +# +# workflow = [] +# +# for step_data in workflow_yaml: +# print("step_data :") +# print(step_data) +# step = { +# "name": step_data["name"], +# "type": step_data["type"] +# } +# +# if "trigger_step" in step_data: +# step["trigger_step"] = step_data["trigger_step"] +# +# if "tool" in step_data: +# step["tool"] = step_data["tool"] +# step["instruction"] = step_data["instruction"] +# +# if "next" in step_data: +# next_steps = [] +# +# if "next_step" in step_data["next"]: +# next_steps.append({"output": "next_step", "step": step_data["next"]["next_step"]}) +# +# if "exit_step" in step_data["next"]: +# next_steps.append({"output": "exit_step", "step": step_data["next"]["exit_step"]}) +# +# step["next"] = next_steps +# +# workflow.append(step) +# +# return workflow +def read_yaml_file(file_path): + print("file_path :") + print(file_path) + try: + with open(file_path, 'r') as yaml_file: + yaml_content = yaml.load(yaml_file, Loader=yaml.FullLoader) + print("type of content returned:", type(yaml_content)) + return yaml_content + except FileNotFoundError: + raise Exception(f"File not found: {file_path}") + except Exception as e: + raise Exception(f"Error reading YAML file: {str(e)}") + + +# run above methods write main +import yaml + +if __name__ == "__main__": + test_yaml = read_yaml_file( + "/Users/jagtarsaggu/Desktop/Dev Projects/SuperAGI/superagi/agent/agent_workflow.yaml") + print("YAML :") + print(test_yaml) + for data in test_yaml["steps"]: + print("data :") + print(data) + + engine = connect_db() + # db_host = get_config('DB_HOST', 'super__postgres') + # db_url = get_config('DB_URL', None) + # db_username = get_config('DB_USERNAME') + # db_password = get_config('DB_PASSWORD') + # db_name = get_config('DB_NAME') + # env = get_config('ENV', "DEV") + # + # if db_url is None: + # if db_username is None: + # db_url = f'postgresql://{db_host}/{db_name}' + # else: + # db_url = f'postgresql://{db_username}:{db_password}@{db_host}/{db_name}' + # else: + # db_url = urlparse(db_url) + # db_url = db_url.scheme + "://" + db_url.netloc + db_url.path + # + # engine = create_engine(db_url, + # pool_size=20, # Maximum number of database connections in the pool + # max_overflow=50, # Maximum number of connections that can be created beyond the pool_size + # pool_timeout=30, # Timeout value in seconds for acquiring a connection from the pool + # pool_recycle=1800, # Recycle connections after this number of seconds (optional) + # pool_pre_ping=False, # Enable connection health checks (optional) + # ) + print("We got engine : ", engine) + Session = sessionmaker(bind=engine) + session = Session() + print("Session : ", session) + agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Test-Yaml-Workflow-Sales", + "Testing Sales Yaml to workflow") + print("Agent Workflow here: ", agent_workflow) + AgentWorkflowBuilder(session, agent_workflow).build_workflow_from_yaml(test_yaml["steps"]) + # session.close() + # engine.dispose() + # response = parse_workflow_yaml(test_yaml["steps"]) + # print("Response :") + # print(response) + +# Exceptions, validation and assumptions +# ------------------------- +# All steps except the loop end and terminal step must have next +# tool step : if it is a tool step it must have tool, instruction and next +# loop step : if it is a loop step it must have next containing next_step and exit_step +# if exit step is not provided it will be considered as terminal step and workflow will terminate once loop is completed +# condition step : if it is a condition step it must have next containing output and step +# if output is not provided it will be considered as terminal step and workflow will terminate once condition is completed +# wait step: if it is a wait step it must have duration and next +# if next is not provided it will be considered as terminal step and workflow will terminate once wait is completed +# wait for permission step: if it is a wait for permission step it must have next, output can be only YES and NO as action console just support them + +#Need to add validation on valid types + +#To Discuss: +#what will happen if we have multiple trigger steps? +#Can we end on wait for permission step? +#Can we end on wait step? +#Can we end on condition step? +#Do we need prompting in types and iteration workflow and wait for permission step in fronetnd? \ No newline at end of file diff --git a/superagi/agent/agent_workflow_validator.py b/superagi/agent/agent_workflow_validator.py new file mode 100644 index 000000000..8251eb461 --- /dev/null +++ b/superagi/agent/agent_workflow_validator.py @@ -0,0 +1,193 @@ +from fastapi import HTTPException + +from superagi.models.tool import Tool +from superagi.models.toolkit import Toolkit + + +class AgentWorkflowValidator: + """AgentWorkflowValidator class is responsible for validating the yaml workflow of the agent""" + + def __init__(self, session, organisation_id): + self.session = session + self.organisation_id = organisation_id + # self.agent_id = agent_id + # self.agent_execution_id = agent_execution_id + + + def validate_workflow_steps(self, workflow_steps): + """Validate the workflow steps. + + Returns: + bool: True if the workflow steps are valid, False otherwise. + """ + # TODO: validate the workflow steps to be called recursively and check if the step type is one of the valid types + # i.e. LOOP, CONDITION, WAIT, WAIT_FOR_PERMISSION, TOOL, ITERATION_WORKFLOW + self.validate_unique_step_name(workflow_steps) + + valid_step_types = ["LOOP", "CONDITION", "WAIT", "WAIT_FOR_PERMISSION", "TOOL", "ITERATION_WORKFLOW"] + for step in workflow_steps["steps"]: + if step.get("type") not in valid_step_types: + raise HTTPException(status_code=500, detail=f"Type does not exist in {step}.") + + if step.get("trigger_step") and str(step.get("trigger_step")).upper() == "TRUE": + self.__validate_trigger_step(step) + + if step.get("type") == "TOOL": + self.__validata_tool_step(step) + elif step.get("type") == "LOOP": + self.__validate_loop_step(step) + elif step.get("type") == "CONDITION": + self.__validate_condition_step(step) + elif step.get("type") == "WAIT": + self.__validate_wait_step(step) + elif step.get("type") == "WAIT_FOR_PERMISSION": + self.__validate_wait_for_permission_step(step) + elif step.get("type") == "ITERATION_WORKFLOW": + self.__validate_iteration_workflow_step(step) + + def __validata_tool_step(self, step): + """Validate the tool step. + + Args: + step (dict): The step. + + Returns: + bool: True if the tool step is valid, False otherwise. + """ + if step.get("tool") is None: + raise HTTPException(status_code=500, detail=f"Tool name not found in step: {step.get('name')}") + + self.__validate_tool(step) + + if step.get("instruction") is None: + raise HTTPException(status_code=500, detail=f"Instruction not found in step: {step.get('name')}") + + if step.get("next") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + + def __validate_loop_step(self, step): + """Validate the loop step. + + Args: + step (dict): The step. + + Returns: + bool: True if the loop step is valid, False otherwise. + """ + if step.get("next") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + if step.get("next").get("next_step") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + + def __validate_condition_step(self, step): + """Validate the condition step. + + Args: + step (dict): The step. + + Returns: + bool: True if the condition step is valid, False otherwise. + """ + if step.get("next") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + for next_steps in step.get("next"): + if next_steps.get("step") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + def __validate_wait_step(self, step): + """Validate the wait step. + + Args: + step (dict): The step. + + Returns: + bool: True if the wait step is valid, False otherwise. + """ + if step.get("duration") is None: + raise HTTPException(status_code=500, detail=f"Duration not found in step: {step.get('name')}") + + def __validate_wait_for_permission_step(self, step): + """Validate the wait for permission step. + + Args: + step (dict): The step. + + Returns: + bool: True if the wait step is valid, False otherwise. + """ + if step.get("next") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + for next_steps in step.get("next"): + if next_steps.get("step") is None: + raise HTTPException(status_code=500, detail=f"Next step not found in step: {step.get('name')}") + + if next_steps.get("output") != "YES" or next_steps.get("output") != "NO": + raise HTTPException(status_code=500, detail=f"Output can only be Yes/No in step: {step.get('name')}") + + def __validate_iteration_workflow_step(self, step): + """Validate the iteration workflow step. + + Args: + step (dict): The step. + + Returns: + bool: True if the iteration workflow step is valid, False otherwise. + """ + pass + + def __validate_trigger_step(self, step): + """Validate the trigger step. + + Args: + step (dict): The step. + + Returns: + bool: True if the trigger step is valid, False otherwise. + """ + if step.get("type") is None: + raise HTTPException(status_code=500,detail=f"Type not found in step: {step.get('name')}") + + valid_step_types = ["TOOL", "ITERATION_WORKFLOW", "LOOP"] + if step.get("type") not in valid_step_types: + raise HTTPException(status_code=500,detail=f"Type field can only be of type: `TOOL`, " + f"`ITERATION_WORKFLOW` or `LOOP` in step: {step.get('name')}") + + + def validate_unique_step_name(self, workflow_steps): + """Validate the workflow steps to have unique step names.""" + + step_names = set() + duplicate_names = [] + for step in workflow_steps["steps"]: + name = step.get("name") + if name: + if name in step_names: + duplicate_names.append(name) + else: + step_names.add(name) + if duplicate_names: + raise HTTPException(status_code=500,detail=f"Duplicate step names found: {duplicate_names}") + + + def __validate_tool(self, step): + """Validate the tool. + + Args: + tool (Tool): The tool. + + Returns: + bool: True if the tool is valid, False otherwise. + """ + toolkit = self.session.query(Tool).filter(Tool.name == step.get("tool")).first() + + if toolkit is None: + raise HTTPException(status_code=500, detail=f"Invalid tool name in step: {step.get('name')}") + + if not self.session.query(Toolkit).filter(Toolkit.id == toolkit.id, + Toolkit.organisation_id == self.organisation_id): + raise HTTPException(status_code=500,detail=f"Tool not installed: {step.get('tool')}") diff --git a/superagi/agent/output_handler.py b/superagi/agent/output_handler.py index 1fdeb7531..f95602337 100644 --- a/superagi/agent/output_handler.py +++ b/superagi/agent/output_handler.py @@ -1,5 +1,5 @@ import json -from superagi.agent.common_types import TaskExecutorResponse, ToolExecutorResponse +from superagi.agent.types.common_types import TaskExecutorResponse, ToolExecutorResponse from superagi.agent.output_parser import AgentSchemaOutputParser from superagi.agent.task_queue import TaskQueue from superagi.agent.tool_executor import ToolExecutor diff --git a/superagi/agent/prompts/condition_step.txt b/superagi/agent/prompts/condition_step.txt new file mode 100644 index 000000000..a34a16c4e --- /dev/null +++ b/superagi/agent/prompts/condition_step.txt @@ -0,0 +1,11 @@ +Analyze {tool_name} output and follow the instruction to come up with the response: +High-Level GOAL: +`{goals}` + +TOOL OUTPUT: +`{tool_output}` + +INSTRUCTION: `{instruction}` + +Analyze the instruction and respond with one of the below outputs. Response should be one of the below options: +{output_options} \ No newline at end of file diff --git a/superagi/agent/tool_executor.py b/superagi/agent/tool_executor.py index 017094164..2a4d33eb8 100644 --- a/superagi/agent/tool_executor.py +++ b/superagi/agent/tool_executor.py @@ -1,6 +1,6 @@ from pydantic import ValidationError -from superagi.agent.common_types import ToolExecutorResponse +from superagi.agent.types.common_types import ToolExecutorResponse from superagi.apm.event_handler import EventHandler from superagi.lib.logger import logger @@ -28,7 +28,9 @@ def execute(self, session, tool_name, tool_args): if tool_name == ToolExecutor.FINISH or tool_name == "": logger.info("\nTask Finished :) \n") return ToolExecutorResponse(status="COMPLETE", result="") + print("All tools, ", tools.keys()) if tool_name in tools.keys(): + print("Tool name : " , tool_name) status = "SUCCESS" tool = tools[tool_name] retry = False diff --git a/superagi/agent/types/agent_workflow_step_action_types.py b/superagi/agent/types/agent_workflow_step_action_types.py index dcce25fc1..d8952cf6b 100644 --- a/superagi/agent/types/agent_workflow_step_action_types.py +++ b/superagi/agent/types/agent_workflow_step_action_types.py @@ -5,13 +5,13 @@ class AgentWorkflowStepAction(Enum): ITERATION_WORKFLOW = 'ITERATION_WORKFLOW' TOOL = 'TOOL' WAIT_STEP = 'WAIT_STEP' - + CONDITION = 'CONDITION' @classmethod - def get_agent_workflow_action_type(cls, store): - if store is None: + def get_agent_workflow_action_type(cls, step_action): + if step_action is None: raise ValueError("Storage type cannot be None.") - store = store.upper() - if store in cls.__members__: - return cls[store] - raise ValueError(f"{store} is not a valid storage name.") + step_action = step_action.upper() + if step_action in cls.__members__: + return cls[step_action] + raise ValueError(f"{step_action} is not a valid storage name.") diff --git a/superagi/agent/common_types.py b/superagi/agent/types/common_types.py similarity index 100% rename from superagi/agent/common_types.py rename to superagi/agent/types/common_types.py diff --git a/superagi/agent/workflow/steps/condition_step.py b/superagi/agent/workflow/steps/condition_step.py new file mode 100644 index 000000000..16b4fa9b8 --- /dev/null +++ b/superagi/agent/workflow/steps/condition_step.py @@ -0,0 +1,94 @@ +from superagi.agent.agent_prompt_builder import AgentPromptBuilder +from superagi.agent.types.agent_execution_status import AgentExecutionStatus +from superagi.agent.types.agent_workflow_step_action_types import AgentWorkflowStepAction +from superagi.helper.prompt_reader import PromptReader +from superagi.helper.token_counter import TokenCounter +from superagi.lib.logger import logger +from superagi.models.agent import Agent +from superagi.models.agent_execution import AgentExecution +from superagi.models.agent_execution_config import AgentExecutionConfiguration +from superagi.models.workflows.agent_workflow_step import AgentWorkflowStep +from superagi.models.workflows.agent_workflow_step_condition import AgentWorkflowStepCondition + + +class AgentConditionStepHandler: + """Handle Agent Wait Step in the agent workflow.""" + + def __init__(self, session, agent_id, agent_execution_id, llm): + self.session = session + self.agent_id = agent_id + self.agent_execution_id = agent_execution_id + self.llm = llm + self.organisation = Agent.find_org_by_agent_id(self.session, self.agent_id) + + def execute_step(self): + """Execute the agent condition step.""" + + logger.info("Executing Condition Step") + execution = AgentExecution.get_agent_execution_from_id(self.session, self.agent_execution_id) + agent_execution_config = AgentExecutionConfiguration.fetch_configuration(self.session, self.agent_execution_id) + workflow_step = AgentWorkflowStep.find_by_id(self.session, execution.current_agent_step_id) + step_condition = AgentWorkflowStepCondition.find_by_id(self.session, workflow_step.action_reference_id) + step_condition_prompt = self._build_condition_prompt(step_condition, workflow_step,agent_execution_config) + messages = [{"role": "system", "content": step_condition_prompt}] + current_tokens = TokenCounter.count_message_tokens(messages, self.llm.get_model()) + response = self.llm.chat_completion(messages, + TokenCounter(session=self.session, + organisation_id=self.organisation.id).token_limit( + self.llm.get_model()) - current_tokens) + if 'content' not in response or response['content'] is None: + raise RuntimeError(f"ToolWorkflowStepHandler: Failed to get output response from llm") + total_tokens = current_tokens + TokenCounter.count_message_tokens(response, self.llm.get_model()) + AgentExecution.update_tokens(self.session, self.agent_execution_id, total_tokens) + step_response = response['content'] + step_response = step_response.replace("'", "").replace("\"", "") + print("CONDITIONAL RESPONSE: ", step_response) + next_step = AgentWorkflowStep.fetch_next_step(self.session, workflow_step.id, step_response) + self._handle_next_step(next_step) + + def update_tool_output(self, tool_output, tool_name, next_step): + if next_step.action_type == AgentWorkflowStepAction.CONDITION.value: + # update the tool output in the condition step + AgentWorkflowStepCondition.update_tool_info(self.session, next_step.unique_id, + tool_output, tool_name) + + def _build_condition_prompt(self, step_condition, workflow_step,agent_execution_config): + # super_agi_prompt = PromptReader.read_agent_prompt(__file__, "agent_tool_output.txt") + super_agi_prompt = """Analyze {tool_name} output and follow the instruction to come up with the response: +High-Level GOAL: +`{goals}` + +TOOL OUTPUT: +`{tool_output}` + +INSTRUCTION: `{instruction}` + +Analyze the instruction and respond with one of the below outputs. Response should be one of the below options: +{output_options}""" + super_agi_prompt = super_agi_prompt.replace("{goals}", AgentPromptBuilder.add_list_items_to_string( + agent_execution_config["goal"])) + super_agi_prompt = super_agi_prompt.replace("{tool_output}", step_condition.tool_output) + super_agi_prompt = super_agi_prompt.replace("{tool_name}", step_condition.tool_name) + super_agi_prompt = super_agi_prompt.replace("{instruction}", step_condition.instruction) + + step_responses = self._get_step_responses(workflow_step) + if "default" in step_responses: + step_responses.remove("default") + super_agi_prompt = super_agi_prompt.replace("{output_options}", str(step_responses)) + return super_agi_prompt + + def _get_step_responses(self, workflow_step: AgentWorkflowStep): + return [step["step_response"] for step in workflow_step.next_steps] + + # TODO: This method can be brought to an abstract method and can be used in all the steps + # TODO: interface can have a execute_step which needs to be implemented differently for each step + # TODO: common attributes can be moved to the abstract class + + def _handle_next_step(self, next_step): + if str(next_step) == "COMPLETE": + agent_execution = AgentExecution.get_agent_execution_from_id(self.session, self.agent_execution_id) + agent_execution.current_agent_step_id = -1 + agent_execution.status = AgentExecutionStatus.COMPLETED.value + else: + AgentExecution.assign_next_step_id(self.session, self.agent_execution_id, next_step.id) + self.session.commit() diff --git a/superagi/agent/workflow/steps/step.py b/superagi/agent/workflow/steps/step.py new file mode 100644 index 000000000..4c6df1bdc --- /dev/null +++ b/superagi/agent/workflow/steps/step.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod + +from superagi.agent.types.agent_execution_status import AgentExecutionStatus +from superagi.models.agent import Agent +from superagi.models.agent_execution import AgentExecution + + +class WorkflowStep(ABC): + def __init__(self, session, llm, agent_execution_id, agent_id, memory): + self.session = session + self.llm = llm + self.agent_execution_id = agent_execution_id + self.agent_id = agent_id + self.memory = memory + # self.task_queue = TaskQueue(str(self.agent_execution_id)) + self.organisation = Agent.find_org_by_agent_id(self.session, self.agent_id) + + @abstractmethod + def execute(self): + """ + This method should be implemented by concrete subclasses to define the execution logic for each step. + """ + pass + + @abstractmethod + def _handle_next_step(self, next_step): + if str(next_step) == "COMPLETE": + agent_execution = AgentExecution.get_agent_execution_from_id(self.session, self.agent_execution_id) + agent_execution.current_agent_step_id = -1 + agent_execution.status = AgentExecutionStatus.COMPLETED.value + else: + AgentExecution.assign_next_step_id(self.session, self.agent_execution_id, next_step.id) + self.session.commit() + diff --git a/superagi/agent/workflow_seed.py b/superagi/agent/workflow_seed.py index 2400faac8..3f7e3242e 100644 --- a/superagi/agent/workflow_seed.py +++ b/superagi/agent/workflow_seed.py @@ -27,6 +27,87 @@ class AgentWorkflowSeed: + @classmethod + def build_recruitment_workflow1(cls, session): + agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Recruitment Workflow1", + "Recruitment Workflow1") + step1 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step1", + ListFileTool().name, + "List the files from the resource manager", + step_type="TRIGGER") + + step2 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step2", + ReadFileTool().name, + "Read all the files in the resource manager") + + step3 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step3", + WriteFileTool().name, + "Write the Name, Email, CGPA, Past companies and Years of experience from the Resume to a .csv file") + + # task queue ends when the elements gets over + step4 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step4", + "TASK_QUEUE", + "Break the above response array of items", + completion_prompt="Get array of items from the above response. Array should suitable utilization of JSON.parse(). Skip job_description file from list.") + + step5 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step5", + ReadFileTool().name, + "Read the key points from above input", + "Check if the resume matches the job criteria given in the goal") + + step6 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step6", + SendEmailTool().name, + "Write a custom Rejection email to the candidate") + + step7 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step7", + SendEmailTool().name, + "Write a custom Acceptance email to the candidates for job profile based on their experience") + + AgentWorkflowStep.add_next_workflow_step(session, step1.id, step2.id) + AgentWorkflowStep.add_next_workflow_step(session, step2.id, step3.id) + AgentWorkflowStep.add_next_workflow_step(session, step3.id, step4.id) + AgentWorkflowStep.add_next_workflow_step(session, step4.id, -1, "COMPLETE") + AgentWorkflowStep.add_next_workflow_step(session, step4.id, step5.id) + AgentWorkflowStep.add_next_workflow_step(session, step5.id, step6.id, "NO") + AgentWorkflowStep.add_next_workflow_step(session, step5.id, step7.id, "YES") + AgentWorkflowStep.add_next_workflow_step(session, step6.id, step4.id) + AgentWorkflowStep.add_next_workflow_step(session, step7.id, step4.id) + session.commit() + @classmethod + def build_test_condition_workflow(cls, session): + agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Test Condition Workflow", "Test Condition Workflow") + step1 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step1", + ReadFileTool().name, + "Read the file named info.txt", + step_type="TRIGGER") + step2 = AgentWorkflowStep.find_or_create_condition_workflow_step(session=session,agent_workflow_id=agent_workflow.id, + unique_id=str(agent_workflow.id) + "_step2", + instruction="Check if the file consists information about nature") + step3 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step3", + WriteFileTool().name, + "Write a poem on nature") + step4 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step4", + WriteFileTool().name, + "Write a poem on cricket") + AgentWorkflowStep.add_next_workflow_step(session, step1.id, step2.id) + AgentWorkflowStep.add_next_workflow_step(session, step2.id, step3.id, "NO") + AgentWorkflowStep.add_next_workflow_step(session, step2.id, step4.id, "YES") + AgentWorkflowStep.add_next_workflow_step(session, step3.id, -1) + AgentWorkflowStep.add_next_workflow_step(session, step4.id, -1) + session.commit() + + + @classmethod def build_sales_workflow(cls, session): agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Sales Engagement Workflow", @@ -57,7 +138,7 @@ def build_sales_workflow(cls, session): step5 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, str(agent_workflow.id) + "_step5", - GoogleSearchTool().name, + SearxSearchTool().name, "Search about the company in which the lead is working") step6 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, @@ -90,10 +171,17 @@ def build_sales_workflow(cls, session): SendEmailTool().name, "Customize the Email according to the company information in the mail") + step12 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step12", + WriteFileTool().name, + "Write a summary about the work done so far in a file named workflow_summary.txt") + # AgentWorkflowStep.add_next_workflow_step(session, step1.id, step2.id) AgentWorkflowStep.add_next_workflow_step(session, step2.id, step3.id) AgentWorkflowStep.add_next_workflow_step(session, step3.id, step4.id) - AgentWorkflowStep.add_next_workflow_step(session, step4.id, -1, "COMPLETE") + # AgentWorkflowStep.add_next_workflow_step(session, step4.id, -1, "COMPLETE") + AgentWorkflowStep.add_next_workflow_step(session, step4.id, step12.id) + AgentWorkflowStep.add_next_workflow_step(session, step12.id, -1, "COMPLETE") AgentWorkflowStep.add_next_workflow_step(session, step4.id, step5.id) AgentWorkflowStep.add_next_workflow_step(session, step5.id, step6.id) AgentWorkflowStep.add_next_workflow_step(session, step6.id, step7.id, "YES") @@ -106,13 +194,13 @@ def build_sales_workflow(cls, session): session.commit() @classmethod - def build_recruitment_workflow(cls, session): - agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Recruitment Workflow", - "Recruitment Workflow") + def build_recruitment_workflow5(cls, session): + agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Recruitment Workflow5", + "Recruitment Workflow5") step1 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, str(agent_workflow.id) + "_step1", ListFileTool().name, - "Read files from the resource manager", + "List the files from the resource manager", step_type="TRIGGER") # task queue ends when the elements gets over @@ -127,11 +215,20 @@ def build_recruitment_workflow(cls, session): ReadFileTool().name, "Read the resume from above input") - step4 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, - str(agent_workflow.id) + "_step4", - ReadFileTool().name, - "Read the job description from file mentioned in High-Level GOAL", - "Check if the resume matches the job description in goal") + step4 = AgentWorkflowStep.find_or_create_condition_workflow_step(session=session,agent_workflow_id=agent_workflow.id, + unique_id=str(agent_workflow.id) + "_step4", + # instruction="Check if the candidate is of a Video Editor or Graphic Designer" + instruction="Check if the resume matches High-Level GOAL") + + + + + + # step4 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + # str(agent_workflow.id) + "_step4", + # ReadFileTool().name, + # "Read the job description from file mentioned in High-Level GOAL", + # "Check if the resume matches the job description in goal") step5 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, str(agent_workflow.id) + "_step5", @@ -142,11 +239,57 @@ def build_recruitment_workflow(cls, session): AgentWorkflowStep.add_next_workflow_step(session, step2.id, step3.id) AgentWorkflowStep.add_next_workflow_step(session, step2.id, -1, "COMPLETE") AgentWorkflowStep.add_next_workflow_step(session, step3.id, step4.id) - AgentWorkflowStep.add_next_workflow_step(session, step4.id, step5.id, "YES") - AgentWorkflowStep.add_next_workflow_step(session, step4.id, step2.id, "NO") + AgentWorkflowStep.add_next_workflow_step(session, step4.id, step5.id,"YES") + AgentWorkflowStep.add_next_workflow_step(session, step4.id, step2.id,"NO") AgentWorkflowStep.add_next_workflow_step(session, step5.id, step2.id) session.commit() + @classmethod + def build_test_workflow(cls,session): + agent_workflow = AgentWorkflow.find_or_create_by_name(session, "Test Workflow", "Test Workflow") + step1 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step1", + WriteFileTool().name, + "write a poem on nature", + step_type="TRIGGER") + + step2 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step2", + ListFileTool().name, + "list the files") + + step3 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step3", + ReadFileTool().name, + "Read the leads from the file") + # task queue ends when the elements gets over + step4 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step4", + "TASK_QUEUE", + "Break the above response array of items", + completion_prompt="Get array of items from the above response. Array should suitable utilization of JSON.parse().") + step5 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step5", + GoogleSearchTool().name, + "Search about the company in which the lead is working") + step6 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step6", + WriteFileTool().name + , "Write a summary about the company searched in the above step in file named on company's name") + step7 = AgentWorkflowStep.find_or_create_tool_workflow_step(session, agent_workflow.id, + str(agent_workflow.id) + "_step7", + WriteFileTool().name, + "Write a summary about the work done so far in a file named workflow_summary.txt") + + AgentWorkflowStep.add_next_workflow_step(session, step1.id, step2.id) + AgentWorkflowStep.add_next_workflow_step(session, step2.id, step3.id) + AgentWorkflowStep.add_next_workflow_step(session, step3.id, step4.id) + AgentWorkflowStep.add_next_workflow_step(session, step4.id, -1, "COMPLETE") + AgentWorkflowStep.add_next_workflow_step(session, step4.id, step5.id) + AgentWorkflowStep.add_next_workflow_step(session, step5.id, step6.id) + AgentWorkflowStep.add_next_workflow_step(session, step6.id, step7.id) + AgentWorkflowStep.add_next_workflow_step(session, step7.id, step4.id) + @classmethod def build_coding_workflow(cls, session): @@ -268,3 +411,4 @@ def build_action_based_agents(cls, session): output = AgentPromptTemplate.analyse_task() IterationWorkflowStep.find_or_create_step(session, iteration_workflow.id, "ab1", output["prompt"], str(output["variables"]), "TRIGGER", "tools") +# /Users/abhijeetsinha/abhijeet/Code/SuperAGI/SuperAGI/superagi/agent/prompts/condition_step.txt \ No newline at end of file diff --git a/superagi/controllers/agent.py b/superagi/controllers/agent.py index 2d108d7d9..8615e6cbb 100644 --- a/superagi/controllers/agent.py +++ b/superagi/controllers/agent.py @@ -100,10 +100,12 @@ def create_agent_with_config(agent_with_config: AgentConfigInput, toolkit_ids=agent_with_config.toolkits) agent_with_config.tools.extend(agent_toolkit_tools) db_agent = Agent.create_agent_with_config(db, agent_with_config) - + print("Agent is _______________________", db_agent.agent_workflow_id) start_step = AgentWorkflow.fetch_trigger_step_id(db.session, db_agent.agent_workflow_id) - iteration_step_id = IterationWorkflow.fetch_trigger_step_id(db.session, - start_step.action_reference_id).id if start_step.action_type == "ITERATION_WORKFLOW" else -1 + print("Start step is _______________________", start_step) + # iteration_step_id = IterationWorkflow.fetch_trigger_step_id(db.session, + # start_step.action_reference_id).id if start_step.action_type == "ITERATION_WORKFLOW" else -1 + iteration_step_id = -1 # Creating an execution with RUNNING status execution = AgentExecution(status='CREATED', last_execution_time=datetime.now(), agent_id=db_agent.id, @@ -122,7 +124,8 @@ def create_agent_with_config(agent_with_config: AgentConfigInput, "LTM_DB": agent_with_config.LTM_DB, "max_iterations": agent_with_config.max_iterations, "user_timezone": agent_with_config.user_timezone, - "knowledge": agent_with_config.knowledge + "knowledge": agent_with_config.knowledge, + "agent_type": agent_with_config.agent_type } db.session.add(execution) db.session.commit() diff --git a/superagi/controllers/agent_execution.py b/superagi/controllers/agent_execution.py index 9658712ec..d439820e6 100644 --- a/superagi/controllers/agent_execution.py +++ b/superagi/controllers/agent_execution.py @@ -195,7 +195,8 @@ def create_agent_run(agent_execution: AgentRunIn, Authorize: AuthJWT = Depends(c "LTM_DB": agent_execution.LTM_DB, "max_iterations": agent_execution.max_iterations, "user_timezone": agent_execution.user_timezone, - "knowledge": agent_execution.knowledge + "knowledge": agent_execution.knowledge, + "agent_type": agent_execution.agent_type } db.session.add(db_agent_execution) diff --git a/superagi/controllers/agent_workflow.py b/superagi/controllers/agent_workflow.py index dc8880ed3..bc230fb9a 100644 --- a/superagi/controllers/agent_workflow.py +++ b/superagi/controllers/agent_workflow.py @@ -1,15 +1,25 @@ from fastapi import APIRouter from fastapi import Depends -from fastapi_sqlalchemy import db +from fastapi_jwt_auth import AuthJWT +from pydantic import BaseModel -from superagi.helper.auth import get_user_organisation -from superagi.models.workflows.agent_workflow import AgentWorkflow +from superagi.helper.agent_workflow import AgentWorkflowHelper +from superagi.helper.auth import get_user_organisation, check_auth router = APIRouter() -@router.get("/list", status_code=201) -def list_workflows(organisation=Depends(get_user_organisation)): +class AgentWorkflowIn(BaseModel): + name: str + description: str + code_yaml: str + class Config: + orm_mode = True + + +@router.get("/list", status_code=200) +def list_agent_workflows(organisation=Depends(get_user_organisation), + Authorize: AuthJWT = Depends(check_auth)): """ Lists agent workflows. @@ -20,10 +30,28 @@ def list_workflows(organisation=Depends(get_user_organisation)): list: A list of dictionaries representing the agent workflows. """ + return AgentWorkflowHelper.list_agent_workflows(organisation_id=organisation.id) + + +@router.post("", status_code=200) +def create_agent_workflow(agent_workflow: AgentWorkflowIn, + organisation=Depends(get_user_organisation), + Authorize: AuthJWT = Depends(check_auth)): + return AgentWorkflowHelper.create_agent_workflow(name=agent_workflow.name, + description=agent_workflow.description, + organisation_id=organisation.id) + + +@router.get("/{agent_workflow_id}", status_code=200) +def get_agent_workflow(agent_workflow_id: int, + Authorize: AuthJWT = Depends(check_auth)): + return AgentWorkflowHelper.get_agent_workflow(agent_workflow_id=agent_workflow_id) - workflows = db.session.query(AgentWorkflow).all() - output_json = [] - for workflow in workflows: - output_json.append(workflow.to_dict()) - return output_json +@router.post("/code_yaml/{agent_workflow_id}", status_code=200) +def add_or_update_agent_workflow_code(agent_workflow_id: int, agent_workflow: AgentWorkflowIn, + organisation=Depends(get_user_organisation), + Authorize: AuthJWT = Depends(check_auth)): + return AgentWorkflowHelper.add_or_update_agent_workflow_code(agent_workflow_id=agent_workflow_id, + agent_workflow_code_yaml=agent_workflow.code_yaml, + organisation_id=organisation.id) \ No newline at end of file diff --git a/superagi/controllers/organisation.py b/superagi/controllers/organisation.py index e366c5966..8fbd022d9 100644 --- a/superagi/controllers/organisation.py +++ b/superagi/controllers/organisation.py @@ -187,6 +187,7 @@ def agent_workflows(organisation=Depends(get_user_organisation)): """ agent_workflows = db.session.query(AgentWorkflow).all() + print("agent_workflows : ", agent_workflows) workflows = [workflow.name for workflow in agent_workflows] return workflows diff --git a/superagi/controllers/types/agent_execution_config.py b/superagi/controllers/types/agent_execution_config.py index 5cb6ad2d6..3655c423f 100644 --- a/superagi/controllers/types/agent_execution_config.py +++ b/superagi/controllers/types/agent_execution_config.py @@ -27,6 +27,7 @@ class AgentRunIn(BaseModel): max_iterations: int user_timezone: Optional[str] knowledge: Optional[int] + agent_type: str class Config: orm_mode = True \ No newline at end of file diff --git a/superagi/controllers/types/agent_with_config.py b/superagi/controllers/types/agent_with_config.py index 5ce81d211..1f1252592 100644 --- a/superagi/controllers/types/agent_with_config.py +++ b/superagi/controllers/types/agent_with_config.py @@ -20,6 +20,7 @@ class AgentConfigInput(BaseModel): max_iterations: int user_timezone: Optional[str] knowledge: Optional[int] + agent_type: str diff --git a/superagi/helper/agent_workflow.py b/superagi/helper/agent_workflow.py new file mode 100644 index 000000000..7caa2f1ea --- /dev/null +++ b/superagi/helper/agent_workflow.py @@ -0,0 +1,98 @@ +import yaml + +from fastapi import HTTPException + +from fastapi_sqlalchemy import db + +from superagi.agent.agent_workflow_validator import AgentWorkflowValidator +from superagi.models.workflows.agent_workflow import AgentWorkflow + + +class AgentWorkflowHelper: + @staticmethod + def list_agent_workflows(organisation_id: int): + agent_workflows = AgentWorkflow.find_by_organisation_id(session=db.session, organisation_id=1) + output = [] + for agent_workflow in agent_workflows: + output.append(agent_workflow.to_dict()) + return output + + @staticmethod + def create_agent_workflow(name: str, description: str, organisation_id: int): + agent_workflow = AgentWorkflow.find_or_create_by_name(session=db.session, + name=name, + description=description, + organisation_id=1) + if agent_workflow is None: + raise HTTPException(status_code=500, detail=f"Agent workflow: {name} not created") + + return {"success": True, "agent_workflow_id": agent_workflow.id} + + @staticmethod + def get_agent_workflow(agent_workflow_id: int): + agent_workflow = AgentWorkflow.find_by_id(session=db.session, id=agent_workflow_id) + + if agent_workflow is None: + raise HTTPException(status_code=404, detail="Agent Workflow not found") + return {"agent_workflow_name": agent_workflow.name, + "agent_workflow_description": agent_workflow.description, + "agent_workflow_code": agent_workflow.code_yaml if agent_workflow.code_yaml is not None else ""} + + @staticmethod + def add_or_update_agent_workflow_code(agent_workflow_id: int, agent_workflow_code_yaml: str, organisation_id: int): + sample_yaml_str = """ + #comment testing + steps: + - name: "Step1" + trigger_step: true + type: "TOOL" + tool: "List File" + instruction: "List the files from the resource manager" + next: "Step2" + + - name: "Step2" + type: "LOOP" + next: + next_step: "Step3" + exit_step: "Step6" + + - name: "Step3" + type: "TOOL" + tool: "Read File" + instruction: "Read the resume from above input" + next: "Step4" + + - name: "Step4" + type: "CONDITION" + instruction: "Check if the resume matches High-Level GOAL" + next: + - output: "NO" + step: "Step2" + - output: "YES" + step: "Step5" + + - name: "Step5" + type: "TOOL" + tool: "Send Email" + instruction: "Write a custom Email the candidates for job profile based on their experience" + next: "Step2" + + - name: "Step6" + type: "TOOL" + tool: "Write File" + instruction: "Write a summary about the work done so far in a file named workflow_summary.txt" + next: "Step2" + """ + print("agent_workflow_code_yaml test print", sample_yaml_str) + print(type(sample_yaml_str)) + yaml_content = yaml.load(sample_yaml_str, Loader=yaml.FullLoader) + print("agent_workflow_code_yaml test print after yaml", yaml_content) + print(type(yaml_content)) + AgentWorkflowValidator(db.session, organisation_id).validate_workflow_steps(workflow_steps= + yaml_content) + agent_workflow = AgentWorkflow.add_or_update_agent_workflow_code_yaml(session=db.session, id=agent_workflow_id, + agent_workflow_code_yaml=sample_yaml_str) + if agent_workflow is None: + raise HTTPException(status_code=404, detail="Agent Workflow not found") + + return {"success": True} \ No newline at end of file diff --git a/superagi/jobs/agent_executor.py b/superagi/jobs/agent_executor.py index 02b8fe406..42a3ca362 100644 --- a/superagi/jobs/agent_executor.py +++ b/superagi/jobs/agent_executor.py @@ -7,6 +7,7 @@ from superagi.agent.agent_tool_step_handler import AgentToolStepHandler from superagi.agent.agent_workflow_step_wait_handler import AgentWaitStepHandler from superagi.agent.types.wait_step_status import AgentWorkflowStepWaitStatus +from superagi.agent.workflow.steps.condition_step import AgentConditionStepHandler from superagi.apm.event_handler import EventHandler from superagi.config.config import get_config from superagi.lib.logger import logger @@ -104,7 +105,9 @@ def execute_next_step(self, agent_execution_id): def __execute_workflow_step(self, agent, agent_config, agent_execution_id, agent_workflow_step, memory, model_api_key, organisation, session): logger.info("Executing Workflow step : ", agent_workflow_step.action_type) + #TODO Can be romoved by using polymorphism if agent_workflow_step.action_type == AgentWorkflowStepAction.TOOL.value: + print("______________________________STEP : TOOL") tool_step_handler = AgentToolStepHandler(session, llm=get_model(model=agent_config["model"], api_key=model_api_key, organisation_id=organisation.id) @@ -121,9 +124,24 @@ def __execute_workflow_step(self, agent, agent_config, agent_execution_id, agent print(get_model(model=agent_config["model"], api_key=model_api_key, organisation_id=organisation.id)) iteration_step_handler.execute_step() elif agent_workflow_step.action_type == AgentWorkflowStepAction.WAIT_STEP.value: + print("______________________________STEP : WAIT") (AgentWaitStepHandler(session=session, agent_id=agent.id, agent_execution_id=agent_execution_id) .execute_step()) + elif agent_workflow_step.action_type == AgentWorkflowStepAction.CONDITION.value: + print("______________________________STEP : CONDITION") + AgentConditionStepHandler(session=session, agent_id=agent.id, + agent_execution_id=agent_execution_id, + llm=get_model(model=agent_config["model"], + api_key=model_api_key, + organisation_id=organisation.id)).execute_step() + + # TODO: Add Code for CONDITION step in workflow + # TODO: Add Code for LOOP step as a separate workflow step + # TODO: Add Code for WAIT_FOR_PERMISSION step as a separate workflow step + # TODO: Handle this switch case in a better way using runtime polymorphism on execute following step interface + # TODO: Think and handle validation of workflow steps in a better way using pydantic if possible else using interface method + # TODO: interface step can have execute,validate method @classmethod def get_embedding(cls, model_source, model_api_key): diff --git a/superagi/models/agent.py b/superagi/models/agent.py index 253f92fa5..7849db0dd 100644 --- a/superagi/models/agent.py +++ b/superagi/models/agent.py @@ -137,6 +137,7 @@ def create_agent_with_config(cls, db, agent_with_config): db.session.commit() agent_workflow = AgentWorkflow.find_by_name(session=db.session, name=agent_with_config.agent_workflow) + print("______________workflow choosen______________",agent_workflow) logger.info("Agent workflow:", str(agent_workflow)) db_agent.agent_workflow_id = agent_workflow.id # @@ -169,6 +170,7 @@ def create_agent_with_config(cls, db, agent_with_config): "max_iterations": agent_with_config.max_iterations, "user_timezone": agent_with_config.user_timezone, "knowledge": agent_with_config.knowledge, + "agent_type": agent_with_config.agent_type } agent_configurations = [ diff --git a/superagi/models/agent_execution_config.py b/superagi/models/agent_execution_config.py index 8e2a43a08..7aedcbc61 100644 --- a/superagi/models/agent_execution_config.py +++ b/superagi/models/agent_execution_config.py @@ -118,6 +118,10 @@ def build_agent_execution_config(cls, session, agent, results_agent, results_age if key in results_agent_dict and value is not None: results_agent_dict[key] = value + if agent and agent.agent_workflow_id is not None: + agent_workflow = session.query(AgentWorkflow).filter(AgentWorkflow.id == agent.agent_workflow_id).first() + results_agent_dict['agent_workflow_code'] = agent_workflow.code_yaml + # Construct the response if 'goal' in results_agent_dict: results_agent_dict['goal'] = eval(results_agent_dict['goal']) @@ -155,6 +159,10 @@ def build_agent_execution_config(cls, session, agent, results_agent, results_age @classmethod def build_scheduled_agent_execution_config(cls, session, agent, results_agent, total_calls, total_tokens): results_agent_dict = {result.key: result.value for result in results_agent} + + if agent and agent.agent_workflow_id is not None: + agent_workflow = session.query(AgentWorkflow).filter(AgentWorkflow.id == agent.agent_workflow_id).first() + results_agent_dict['agent_workflow_code'] = agent_workflow.code_yaml # Construct the response if 'goal' in results_agent_dict: diff --git a/superagi/models/db.py b/superagi/models/db.py index c6844cfd4..5d534c048 100644 --- a/superagi/models/db.py +++ b/superagi/models/db.py @@ -24,7 +24,11 @@ def connect_db(): db_password = get_config('DB_PASSWORD') db_name = get_config('DB_NAME') db_url = get_config('DB_URL', None) - + print(db_url) + print(db_host) + print(db_username) + print(db_password) + print(db_name) if db_url is None: if db_username is None: db_url = f'postgresql://{db_host}/{db_name}' @@ -41,6 +45,11 @@ def connect_db(): pool_recycle=1800, # Recycle connections after this number of seconds (optional) pool_pre_ping=False, # Enable connection health checks (optional) ) + db_url = "postgresql://superagi:password@localhost:5432/super_agi_main" + # print(db_url) + engine = create_engine(db_url) + print('engine created') + print(engine) # Test the connection try: diff --git a/superagi/models/workflows/agent_workflow.py b/superagi/models/workflows/agent_workflow.py index 5a9bac05f..0748c084d 100644 --- a/superagi/models/workflows/agent_workflow.py +++ b/superagi/models/workflows/agent_workflow.py @@ -1,4 +1,5 @@ import json +from typing import Optional from sqlalchemy import Column, Integer, String, Text @@ -21,6 +22,8 @@ class AgentWorkflow(DBBaseModel): id = Column(Integer, primary_key=True) name = Column(String) description = Column(Text) + organisation_id = Column(Integer) + code_yaml = Column(Text) def __repr__(self): """ @@ -30,8 +33,10 @@ def __repr__(self): str: String representation of the AgentWorkflow. """ - return f"AgentWorkflow(id={self.id}, name='{self.name}', " \ - f"description='{self.description}')" + return f"AgentWorkflow(id='{self.id}', name='{self.name}', " \ + f"description='{self.description}', " \ + f"organisation id='{self.organisation_id}', "\ + f"workflow code='{self.code_yaml}')" def to_dict(self): """ @@ -95,7 +100,7 @@ def fetch_trigger_step_id(cls, session, workflow_id): @classmethod def find_by_id(cls, session, id: int): - """Create or find an agent workflow by name.""" + """Create or find an agent workflow by id.""" return session.query(AgentWorkflow).filter(AgentWorkflow.id == id).first() @@ -105,11 +110,31 @@ def find_by_name(cls, session, name: str): return session.query(AgentWorkflow).filter(AgentWorkflow.name == name).first() @classmethod - def find_or_create_by_name(cls, session, name: str, description: str): + def find_or_create_by_name(cls, session, name: str, description: str, organisation_id: Optional[int] = None): """Create or find an agent workflow by name.""" + print("Session : ",session) + print("Name : ",name) + print("Description : ",description) agent_workflow = session.query(AgentWorkflow).filter(AgentWorkflow.name == name).first() + print("Agent Workflow : ",agent_workflow) if agent_workflow is None: - agent_workflow = AgentWorkflow(name=name, description=description) + agent_workflow = AgentWorkflow(name=name, description=description, organisation_id=organisation_id) session.add(agent_workflow) session.commit() return agent_workflow + + @classmethod + def find_by_organisation_id(cls, session, organisation_id: int): + workflows = session.query(AgentWorkflow).filter(AgentWorkflow.organisation_id == organisation_id).all() + return workflows + + @classmethod + def add_or_update_agent_workflow_code_yaml(cls, session, id: int, agent_workflow_code_yaml: str): + agent_workflow = session.query(AgentWorkflow).filter(AgentWorkflow.id == id).first() + + if agent_workflow is not None: + agent_workflow.code_yaml = agent_workflow_code_yaml + session.commit() + + return agent_workflow + diff --git a/superagi/models/workflows/agent_workflow_step.py b/superagi/models/workflows/agent_workflow_step.py index b7a3ada16..88d0667be 100644 --- a/superagi/models/workflows/agent_workflow_step.py +++ b/superagi/models/workflows/agent_workflow_step.py @@ -3,8 +3,10 @@ from sqlalchemy import Column, Integer, String from sqlalchemy.dialects.postgresql import JSONB +from superagi.agent.types.agent_workflow_step_action_types import AgentWorkflowStepAction from superagi.lib.logger import logger from superagi.models.base_model import DBBaseModel +from superagi.models.workflows.agent_workflow_step_condition import AgentWorkflowStepCondition from superagi.models.workflows.agent_workflow_step_tool import AgentWorkflowStepTool from superagi.models.workflows.agent_workflow_step_wait import AgentWorkflowStepWait from superagi.models.workflows.iteration_workflow import IterationWorkflow @@ -111,7 +113,15 @@ def find_by_id(cls, session, step_id: int): def find_or_create_tool_workflow_step(cls, session, agent_workflow_id: int, unique_id: str, tool_name: str, input_instruction: str, output_instruction: str = "", step_type="NORMAL", - history_enabled: bool = True, completion_prompt: str = None): + history_enabled: bool = True, completion_prompt: str = "Respond with only " + "valid JSON " + "conforming to the " + "given json schema. " + "Response should " + "contain tool name " + "and tool arguments " + "to achieve the " + "given instruction."): """ Find or create a tool workflow step Args: @@ -130,8 +140,8 @@ def find_or_create_tool_workflow_step(cls, session, agent_workflow_id: int, uniq """ workflow_step = session.query(AgentWorkflowStep).filter( AgentWorkflowStep.agent_workflow_id == agent_workflow_id, AgentWorkflowStep.unique_id == unique_id).first() - if completion_prompt is None: - completion_prompt = f"Respond with only valid JSON conforming to the given json schema. Response should contain tool name and tool arguments to achieve the given instruction." + # if completion_prompt is None: + # completion_prompt = f"Respond with only valid JSON conforming to the given json schema. Response should contain tool name and tool arguments to achieve the given instruction." step_tool = AgentWorkflowStepTool.find_or_create_tool(session, unique_id, tool_name, input_instruction, output_instruction, history_enabled, completion_prompt) @@ -144,8 +154,8 @@ def find_or_create_tool_workflow_step(cls, session, agent_workflow_id: int, uniq workflow_step.step_type = step_type workflow_step.agent_workflow_id = agent_workflow_id workflow_step.action_reference_id = step_tool.id - workflow_step.action_type = "TOOL" - workflow_step.next_steps = [] + workflow_step.action_type = AgentWorkflowStepAction.TOOL.value + workflow_step.next_steps = [] if workflow_step.next_steps is None else workflow_step.next_steps workflow_step.completion_prompt = completion_prompt session.commit() return workflow_step @@ -168,11 +178,46 @@ def find_or_create_wait_workflow_step(cls, session, agent_workflow_id: int, uniq workflow_step.step_type = step_type workflow_step.agent_workflow_id = agent_workflow_id workflow_step.action_reference_id = step_wait.id - workflow_step.action_type = "WAIT_STEP" + workflow_step.action_type = AgentWorkflowStepAction.WAIT_STEP.value workflow_step.next_steps = [] session.commit() return workflow_step + @classmethod + def find_or_create_condition_workflow_step(cls,session,agent_workflow_id: int, unique_id: str, + instruction: str, step_type="NORMAL"): + """ Find or create a condition workflow step + + Args: + session: db session + agent_workflow_id: id of the agent workflow + unique_id: unique id of the step + instruction: instruction of the condition + step_type: type of the step + + Returns: + AgentWorkflowStep. + """ + # if + workflow_step = session.query(AgentWorkflowStep).filter( + AgentWorkflowStep.agent_workflow_id == agent_workflow_id, AgentWorkflowStep.unique_id == unique_id).first() + print("___________workflow_step", workflow_step) + step_condition = AgentWorkflowStepCondition.find_or_create(session, unique_id, instruction) + print("___________step_condition", step_condition) + if workflow_step is None: + workflow_step = AgentWorkflowStep(unique_id=unique_id, step_type=step_type, + agent_workflow_id=agent_workflow_id) + session.add(workflow_step) + session.commit() + workflow_step.step_type = step_type + workflow_step.agent_workflow_id = agent_workflow_id + workflow_step.action_reference_id = step_condition.id + workflow_step.action_type = AgentWorkflowStepAction.CONDITION.value + workflow_step.next_steps = [] if workflow_step.next_steps is None else workflow_step.next_steps + session.commit() + return workflow_step + + @classmethod def find_or_create_iteration_workflow_step(cls, session, agent_workflow_id: int, unique_id: str, iteration_workflow_name: str, step_type="NORMAL"): @@ -259,9 +304,11 @@ def fetch_next_step(cls, session, current_agent_step_id: int, step_response: str step_response: response of the current step """ current_step = AgentWorkflowStep.find_by_id(session, current_agent_step_id) + print("CURRENT STEP : _____________", current_step) next_steps = current_step.next_steps + print("Next Steps : _____________", next_steps) matching_steps = [step for step in next_steps if str(step["step_response"]).lower() == step_response.lower()] - + print("Matching Steps : ________________", matching_steps) if matching_steps: if str(matching_steps[0]["step_id"]) == "-1": return "COMPLETE" @@ -269,9 +316,10 @@ def fetch_next_step(cls, session, current_agent_step_id: int, step_response: str logger.info(f"Could not find next step for step_id: {current_agent_step_id} and step_response: {step_response}") default_steps = [step for step in next_steps if str(step["step_response"]).lower() == "default"] - + print("default_steps : ________________", default_steps) if default_steps: if str(default_steps[0]["step_id"]) == "-1": return "COMPLETE" return AgentWorkflowStep.find_by_unique_id(session, default_steps[0]["step_id"]) return None + diff --git a/superagi/models/workflows/agent_workflow_step_condition.py b/superagi/models/workflows/agent_workflow_step_condition.py new file mode 100644 index 000000000..cc88542d4 --- /dev/null +++ b/superagi/models/workflows/agent_workflow_step_condition.py @@ -0,0 +1,135 @@ +from sqlalchemy import Column, Integer, String, DateTime +import json # for JSON serialization + +from superagi.models.base_model import DBBaseModel + + +class AgentWorkflowStepCondition(DBBaseModel): + """ + Step for an Agent Workflow condition. + + Attributes: + id (int): The unique identifier of the condition step. + instruction (str): The instruction or condition description. + tool_output (str): The output from a tool or condition evaluation result. + """ + + __tablename__ = 'agent_workflow_step_conditions' + + id = Column(Integer, primary_key=True) + instruction = Column(String) + tool_output = Column(String) + tool_name = Column(String) + unique_id = Column(String) + + # status = Column(String) # 'PENDING', 'EVALUATED', 'COMPLETED' + + def __repr__(self): + """ + Returns a string representation of the AgentWorkflowStepCondition object. + + Returns: + str: String representation of the AgentWorkflowStepCondition. + """ + + return f"AgentWorkflowStepCondition(id={self.id}, instruction='{self.instruction}', " \ + f"tool_output='{self.tool_output}', unique_id='{self.unique_id}, tool_name='{self.tool_name}')" + + def to_dict(self): + """ + Converts the AgentWorkflowStepCondition object to a dictionary. + + Returns: + dict: Dictionary representation of the AgentWorkflowStepCondition. + """ + + return { + 'id': self.id, + 'instruction': self.instruction, + 'tool_output': self.tool_output, + 'unique_id': self.unique_id, + 'tool_name': self.tool_name, + } + + def to_json(self): + """ + Converts the AgentWorkflowStepCondition object to a JSON string. + + Returns: + str: JSON string representation of the AgentWorkflowStepCondition. + """ + + return json.dumps(self.to_dict()) + + @classmethod + def find_or_create(cls, session, step_unique_id: str, instruction: str): + """ + Find or create an AgentWorkflowStepCondition. + + Args: + session (Session): The database session. + step_unique_id (str): The unique ID of the step. + instruction (str): The instruction or condition description. + + Returns: + AgentWorkflowStepCondition: The AgentWorkflowStepCondition. + """ + + unique_id = f"{step_unique_id}_condition" + print("unique : ",unique_id) + condition_step = session.query(AgentWorkflowStepCondition).filter( + AgentWorkflowStepCondition.unique_id == unique_id).first() + + print('condition_step : ',condition_step) + if condition_step is None: + condition_step = AgentWorkflowStepCondition( + unique_id=unique_id, + instruction=instruction, + ) + session.add(condition_step) + else: + condition_step.instruction = instruction + session.commit() + session.flush() + return condition_step + + @classmethod + def update_tool_info(cls, session, step_unique_id: str, tool_output: str, tool_name: str): + """ + Update the tool output of an AgentWorkflowStepCondition. + + Args: + session (Session): The database session. + step_unique_id (str): The unique ID of the step. + tool_output (str): The tool output. + tool_name (str): The tool name. + + Returns: + AgentWorkflowStepCondition: The AgentWorkflowStepCondition. + """ + + unique_id = f"{step_unique_id}_condition" + condition_step = session.query(AgentWorkflowStepCondition).filter( + AgentWorkflowStepCondition.unique_id == unique_id).first() + if condition_step is None: + raise ValueError(f"Condition step with unique ID {unique_id} not found.") + condition_step.tool_output = tool_output + condition_step.tool_name = tool_name + session.commit() + session.flush() + return condition_step + + @classmethod + def find_by_id(cls, session, step_id: int): + """ + Find an AgentWorkflowStepCondition by ID. + + Args: + session (Session): The database session. + step_id (int): The ID of the step. + + Returns: + AgentWorkflowStepCondition: The AgentWorkflowStepCondition. + """ + + return session.query(AgentWorkflowStepCondition).filter(AgentWorkflowStepCondition.id == step_id).first() diff --git a/tests/unit_tests/agent/test_agent_tool_step_handler.py b/tests/unit_tests/agent/test_agent_tool_step_handler.py index 1d57026c7..5b35d5468 100644 --- a/tests/unit_tests/agent/test_agent_tool_step_handler.py +++ b/tests/unit_tests/agent/test_agent_tool_step_handler.py @@ -4,7 +4,7 @@ import pytest from superagi.agent.agent_tool_step_handler import AgentToolStepHandler -from superagi.agent.common_types import ToolExecutorResponse +from superagi.agent.types.common_types import ToolExecutorResponse from superagi.agent.output_handler import ToolOutputHandler from superagi.agent.tool_builder import ToolBuilder from superagi.helper.token_counter import TokenCounter diff --git a/tests/unit_tests/agent/test_output_handler.py b/tests/unit_tests/agent/test_output_handler.py index 45474f217..06cc6c453 100644 --- a/tests/unit_tests/agent/test_output_handler.py +++ b/tests/unit_tests/agent/test_output_handler.py @@ -1,15 +1,9 @@ -import pytest from unittest.mock import Mock, patch, MagicMock -from superagi.agent.common_types import ToolExecutorResponse -from superagi.agent.output_handler import ToolOutputHandler, TaskOutputHandler, ReplaceTaskOutputHandler +from superagi.agent.output_handler import TaskOutputHandler, ReplaceTaskOutputHandler from superagi.agent.output_parser import AgentSchemaOutputParser, AgentGPTAction from superagi.agent.task_queue import TaskQueue -from superagi.agent.tool_executor import ToolExecutor from superagi.helper.json_cleaner import JsonCleaner -from superagi.models.agent import Agent -from superagi.models.agent_execution_permission import AgentExecutionPermission -import numpy as np from superagi.agent.output_handler import ToolOutputHandler diff --git a/tests/unit_tests/agent/test_tool_executor.py b/tests/unit_tests/agent/test_tool_executor.py index 35e1735f3..f2c56d9b4 100644 --- a/tests/unit_tests/agent/test_tool_executor.py +++ b/tests/unit_tests/agent/test_tool_executor.py @@ -1,9 +1,6 @@ import pytest from unittest.mock import Mock, patch -from pydantic import ValidationError - -from superagi.agent.common_types import ToolExecutorResponse from superagi.agent.tool_executor import ToolExecutor class MockTool: diff --git a/tools.json b/tools.json index 94de16ccc..956fc7a25 100644 --- a/tools.json +++ b/tools.json @@ -1,4 +1,7 @@ { - "tools": { - } + "tools": { + "duck_duck_go": "https://github.com/TransformerOptimus/SuperAGI-Tools/tree/main/duck_duck_go", + "google_analytics": "https://github.com/TransformerOptimus/SuperAGI-Tools/tree/main/google_analytics", + "notion": "https://github.com/TransformerOptimus/SuperAGI-Tools/tree/main/notion" + } } \ No newline at end of file