forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsend_message.py
31 lines (24 loc) · 897 Bytes
/
send_message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import sys
from temporalio.client import Client
from workflows import SignalQueryBedrockWorkflow
async def main(prompt):
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
workflow_id = "bedrock-workflow-with-signals"
inactivity_timeout_minutes = 1
# Sends a signal to the workflow (and starts it if needed)
await client.start_workflow(
SignalQueryBedrockWorkflow.run,
inactivity_timeout_minutes,
id=workflow_id,
task_queue="bedrock-task-queue",
start_signal="user_prompt",
start_signal_args=[prompt],
)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python send_message.py '<prompt>'")
print("Example: python send_message.py 'What animals are marsupials?'")
else:
asyncio.run(main(sys.argv[1]))