Skip to content

llama-deploy nested workflow : You cannot use the sync client within an async event loop #493

@sreeharissh14

Description

@sreeharissh14

Hi Team,

I am trying to run the llama-deploy nested workflow example in below website:

https://docs.llamaindex.ai/en/stable/module_guides/llama_deploy/30_manual_orchestration/

I am facing the below error:
You cannot use the sync client within an async event loop - just await the async methods directly.

interact.py

from llama_deploy import Client
client = Client(control_plane_url="http://127.0.0.1:8000")


async def run_task():
    session = await client.core.sessions.create()
    result = await session.run("outer", arg1="Hello World!")
    print(result)

if __name__ == "__main__":
    import asyncio
    asyncio.run(run_task())

deploy_workflow.py

from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
import asyncio
from llama_deploy import (
    WorkflowServiceConfig,
    ControlPlaneConfig,
    deploy_workflow,
)

class InnerWorkflow(Workflow):
    @step()
    async def run_step(self, ev: StartEvent) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        return StopEvent(result=str(arg1) + "_result")


class OuterWorkflow(Workflow):
    @step()
    async def run_step(
        self, ev: StartEvent, inner: InnerWorkflow
    ) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        arg1 = await inner.run(arg1=arg1)

        return StopEvent(result=str(arg1) + "_result")


inner = InnerWorkflow()
outer = OuterWorkflow()
outer.add_workflows(inner=InnerWorkflow())

async def main():
    inner_task = asyncio.create_task(
        deploy_workflow(
            inner,
            WorkflowServiceConfig(
                host="127.0.0.1", port=8003, service_name="inner"
            ),
            ControlPlaneConfig(host="127.0.0.1", port=8000),
        )
    )

    outer_task = asyncio.create_task(
        deploy_workflow(
            outer,
            WorkflowServiceConfig(
                host="127.0.0.1", port=8002, service_name="outer"
            ),
            ControlPlaneConfig(host="127.0.0.1", port=8000),
        )
    )

    # This will run forever until you interrupt the process, like by pressing CTRL+C
    await asyncio.gather(inner_task, outer_task)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

core.py

async def main() -> None:
    await deploy_core(
        control_plane_config=ControlPlaneConfig(port=8000),
        message_queue_config=redis_config,
    )


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    wontfixThis will not be worked on

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions