-
Notifications
You must be signed in to change notification settings - Fork 230
Closed
Description
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai import OpenAI
class JokeEvent(Event):
joke: str
class JokeFlow(Workflow):
llm = OpenAI()
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"Write your best joke about {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))
While this code works fine for single tasks, I noticed that when I attempt to run multiple instances of this workflow (with different inputs), the tasks are not processed concurrently. For example, running two tasks one after the other results in them being processed sequentially rather than in parallel, which defeats the purpose of using an asynchronous model.
class ParallelFlow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
ctx.send_event(StepTwoEvent(query="Query 1"))
ctx.send_event(StepTwoEvent(query="Query 2"))
ctx.send_event(StepTwoEvent(query="Query 3"))
@step(num_workers=4)
async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
print("Running slow query", ev.query)
await asyncio.sleep(random.randint(1, 5))
return StopEvent(result=ev.query)
one solution i can think of is to run a workflow as a step with another workflow and set num_work , I would greatly appreciate any insight or suggestions in To achieve parallelism
Metadata
Metadata
Assignees
Labels
No labels