Skip to content

Using Deployed Nested Workflow raise "You cannot use the sync client within an async event loop - just await the async methods directly." #543

@main-2983

Description

@main-2983

I follow the tutorial to deploy nested workflow on LlamaIndex document with some changes to the code

Step 1: Deploy core

from llama_deploy import (
    deploy_core,
    ControlPlaneConfig,
    SimpleMessageQueueConfig,
)


async def main():
    # This will run forever until you interrupt the process, like by pressing CTRL+C
    await deploy_core(
        control_plane_config=ControlPlaneConfig(),
        message_queue_config=SimpleMessageQueueConfig(),
    )


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Step 2: Deploy Nested workflow

from llama_deploy import (
    deploy_workflow,
    WorkflowServiceConfig,
    ControlPlaneConfig,
    SimpleMessageQueueConfig,
)
from llama_index.core.workflow import (
    Context,
    Event,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)


class ProgressEvent(Event):
    progress: str


from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step


class InnerWorkflow(Workflow):
    @step()
    async def run_step(self, ev: StartEvent) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        return StopEvent(result=str(arg1) + "_result")


class OuterWorkflow(Workflow):
    @step()
    async def run_step(
        self, ev: StartEvent, inner: InnerWorkflow
    ) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        arg1 = await inner.run(arg1=arg1)

        return StopEvent(result=str(arg1) + "_result")


inner = InnerWorkflow()
outer = OuterWorkflow()
outer.add_workflows(inner=InnerWorkflow())


async def main():
    inner_task = asyncio.create_task(
        deploy_workflow(
            inner,
            WorkflowServiceConfig(
                host="127.0.0.1", port=8003, service_name="inner"
            ),
            ControlPlaneConfig(),
        )
    )

    outer_task = asyncio.create_task(
        deploy_workflow(
            outer,
            WorkflowServiceConfig(
                host="127.0.0.1", port=8002, service_name="outer"
            ),
            ControlPlaneConfig(),
        )
    )

    # This will run forever until you interrupt the process, like by pressing CTRL+C
    await asyncio.gather(inner_task, outer_task)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Step 3: Interact with the workflow

from llama_deploy import Client, ControlPlaneConfig

# point the client to the running control plane from the previous steps
client = Client(control_plane_url="http://localhost:8000") # <---- Change was made here, 8001 to 8000 to match the control plane port


async def run_task():
    session = await client.core.sessions.create() # <----- Change was made here, c1 to client
    result = await session.run("outer", arg="Hello World!")
    print(result)
    # prints 'Hello World! result'


if __name__ == '__main__':
    import asyncio

    asyncio.run(run_task())

Running the code in step 3, I got the RuntimeError message: "You cannot use the sync client within an async event loop - just await the async methods directly."

After debugging, I found out that deploy the nested workflow using function deploy_workflow creates a NetworkServiceManager, which has a function to get a Sync Client
My llama-deploy version: 0.8.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    wontfixThis will not be worked on

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions