forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_history.py
31 lines (21 loc) · 887 Bytes
/
get_history.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
from temporalio.client import Client
from workflows import EntityBedrockWorkflow
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
workflow_id = "entity-bedrock-workflow"
handle = client.get_workflow_handle(workflow_id)
# Queries the workflow for the conversation history
history = await handle.query(EntityBedrockWorkflow.get_conversation_history)
print("Conversation History")
print(
*(f"{speaker.title()}: {message}\n" for speaker, message in history), sep="\n"
)
# Queries the workflow for the conversation summary
summary = await handle.query(EntityBedrockWorkflow.get_summary_from_history)
if summary is not None:
print("Conversation Summary:")
print(summary)
if __name__ == "__main__":
asyncio.run(main())