2
2
import openai_multi_tool_use_parallel_patch
3
3
4
4
# General imports
5
+ from typing import List , Optional , Tuple
5
6
from azure .communication .callautomation import (
6
7
CallAutomationClient ,
7
8
CallConnectionClient ,
33
34
from models .synthesis import SynthesisModel
34
35
from persistence .cosmos import CosmosStore
35
36
from persistence .sqlite import SqliteStore
37
+ import html
36
38
import re
37
39
import asyncio
38
40
from models .message import (
@@ -207,23 +209,23 @@ async def call_inbound_post(request: Request):
207
209
208
210
209
211
@api .post (
210
- "/call/event/{call_id }" ,
212
+ "/call/event/{phone_number }" ,
211
213
description = "Handle callbacks from Azure Communication Services." ,
212
214
status_code = status .HTTP_204_NO_CONTENT ,
213
215
)
214
216
# TODO: Secure this endpoint with a secret
215
217
# See: https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/communication-services/how-tos/call-automation/secure-webhook-endpoint.md
216
218
async def call_event_post (
217
- request : Request , background_tasks : BackgroundTasks , call_id : UUID
219
+ request : Request , background_tasks : BackgroundTasks , phone_number : str
218
220
) -> None :
219
221
for event_dict in await request .json ():
220
- background_tasks .add_task (communication_evnt_worker , event_dict , call_id )
222
+ background_tasks .add_task (communication_event_worker , event_dict , phone_number )
221
223
222
224
223
- async def communication_evnt_worker (event_dict : dict , call_id : UUID ) -> None :
224
- call = await db .call_aget ( call_id )
225
+ async def communication_event_worker (event_dict : dict , phone_number : str ) -> None :
226
+ call = await db .call_asearch_one ( phone_number )
225
227
if not call :
226
- _logger .warn (f"Call { call_id } not found" )
228
+ _logger .warn (f"Call with { phone_number } not found" )
227
229
return
228
230
229
231
event = CloudEvent .from_dict (event_dict )
@@ -249,7 +251,7 @@ async def communication_evnt_worker(event_dict: dict, call_id: UUID) -> None:
249
251
)
250
252
)
251
253
252
- if not call .messages : # First call
254
+ if len ( call .messages ) == 1 : # First call
253
255
await handle_recognize_text (
254
256
call = call ,
255
257
client = client ,
@@ -262,7 +264,7 @@ async def communication_evnt_worker(event_dict: dict, call_id: UUID) -> None:
262
264
client = client ,
263
265
text = TTSPrompt .WELCOME_BACK ,
264
266
)
265
- await intelligence (call , client )
267
+ call = await intelligence (call , client )
266
268
267
269
elif event_type == "Microsoft.Communication.CallDisconnected" : # Call hung up
268
270
_logger .info (f"Call disconnected ({ call .call_id } )" )
@@ -279,7 +281,7 @@ async def communication_evnt_worker(event_dict: dict, call_id: UUID) -> None:
279
281
call .messages .append (
280
282
MessageModel (content = speech_text , persona = MessagePersona .HUMAN )
281
283
)
282
- await intelligence (call , client )
284
+ call = await intelligence (call , client )
283
285
284
286
elif (
285
287
event_type == "Microsoft.Communication.RecognizeFailed"
@@ -366,7 +368,7 @@ async def communication_evnt_worker(event_dict: dict, call_id: UUID) -> None:
366
368
await db .call_aset (call )
367
369
368
370
369
- async def intelligence (call : CallModel , client : CallConnectionClient ) -> None :
371
+ async def intelligence (call : CallModel , client : CallConnectionClient ) -> CallModel :
370
372
"""
371
373
Handle the intelligence of the call, including: GPT chat, GPT completion, TTS, and media play.
372
374
@@ -380,7 +382,7 @@ async def intelligence(call: CallModel, client: CallConnectionClient) -> None:
380
382
hard_timeout_task = asyncio .create_task (
381
383
asyncio .sleep (CONFIG .workflow .intelligence_hard_timeout_sec )
382
384
)
383
- chat_res = None
385
+ chat_action = None
384
386
385
387
try :
386
388
while True :
@@ -397,7 +399,7 @@ async def intelligence(call: CallModel, client: CallConnectionClient) -> None:
397
399
soft_timeout_task .cancel ()
398
400
hard_timeout_task .cancel ()
399
401
# Answer with chat result
400
- chat_res = chat_task .result ()
402
+ call , chat_action = chat_task .result ()
401
403
break
402
404
# Break when hard timeout is reached
403
405
if hard_timeout_task .done ():
@@ -425,55 +427,57 @@ async def intelligence(call: CallModel, client: CallConnectionClient) -> None:
425
427
_logger .warn (f"Error loading intelligence ({ call .call_id } )" , exc_info = True )
426
428
427
429
# For any error reason, answer with error
428
- if not chat_res :
430
+ if not chat_action :
429
431
_logger .debug (
430
432
f"Error loading intelligence ({ call .call_id } ), answering with default error"
431
433
)
432
- chat_res = ActionModel (content = TTSPrompt .ERROR , intent = IndentAction .CONTINUE )
434
+ chat_action = ActionModel (content = TTSPrompt .ERROR , intent = IndentAction .CONTINUE )
433
435
434
- _logger .info (f"Chat ({ call .call_id } ): { chat_res } " )
436
+ _logger .info (f"Chat ({ call .call_id } ): { chat_action } " )
435
437
436
- if chat_res .intent == IndentAction .TALK_TO_HUMAN :
438
+ if chat_action .intent == IndentAction .TALK_TO_HUMAN :
437
439
await handle_play (
438
440
call = call ,
439
441
client = client ,
440
442
context = Context .CONNECT_AGENT ,
441
443
text = TTSPrompt .END_CALL_TO_CONNECT_AGENT ,
442
444
)
443
445
444
- elif chat_res .intent == IndentAction .END_CALL :
446
+ elif chat_action .intent == IndentAction .END_CALL :
445
447
await handle_play (
446
448
call = call ,
447
449
client = client ,
448
450
context = Context .GOODBYE ,
449
451
text = TTSPrompt .GOODBYE ,
450
452
)
451
453
452
- elif chat_res .intent in (
454
+ elif chat_action .intent in (
453
455
IndentAction .NEW_CLAIM ,
454
456
IndentAction .UPDATED_CLAIM ,
455
457
IndentAction .NEW_OR_UPDATED_REMINDER ,
456
458
):
457
- # Save in DB allowing demos to be more "real-time"
459
+ # Save in DB for new claims and allowing demos to be more "real-time"
458
460
await db .call_aset (call )
459
461
# Answer with intermediate response
460
462
await handle_play (
461
463
call = call ,
462
464
client = client ,
463
465
store = False ,
464
- text = chat_res .content ,
466
+ text = chat_action .content ,
465
467
)
466
468
# Recursively call intelligence to continue the conversation
467
- await intelligence (call , client )
469
+ call = await intelligence (call , client )
468
470
469
471
else :
470
472
await handle_recognize_text (
471
473
call = call ,
472
474
client = client ,
473
475
store = False ,
474
- text = chat_res .content ,
476
+ text = chat_action .content ,
475
477
)
476
478
479
+ return call
480
+
477
481
478
482
async def handle_play (
479
483
client : CallConnectionClient ,
@@ -555,7 +559,7 @@ async def gpt_completion(system: LLMPrompt, call: CallModel) -> str:
555
559
return content or ""
556
560
557
561
558
- async def gpt_chat (call : CallModel ) -> ActionModel :
562
+ async def gpt_chat (call : CallModel ) -> Tuple [ CallModel , ActionModel ] :
559
563
_logger .debug (f"Running GPT chat ({ call .call_id } )" )
560
564
561
565
messages = [
@@ -647,7 +651,7 @@ async def gpt_chat(call: CallModel) -> ActionModel:
647
651
{
648
652
"type" : "function" ,
649
653
"function" : {
650
- "description" : "Use this if the user wants to create a new claim. This will reset the claim and reminder data. Old is stored but not accessible anymore. Approval from the customer must be explicitely given. Example: 'I want to create a new claim'." ,
654
+ "description" : "Use this if the user wants to create a new claim. This will reset the claim and reminder data. Old is stored but not accessible anymore. Approval from the customer must be explicitely given. Do not use this action twice in a row. Example: 'I want to create a new claim'." ,
651
655
"name" : IndentAction .NEW_CLAIM .value ,
652
656
"parameters" : {
653
657
"properties" : {
@@ -747,7 +751,9 @@ async def gpt_chat(call: CallModel) -> ActionModel:
747
751
748
752
content = res .choices [0 ].message .content or ""
749
753
content = re .sub (
750
- rf"^(?:{ '|' .join ([action .value for action in MessageAction ])} ):" , "" , content
754
+ rf"^(?:{ '|' .join ([action .value for action in MessageAction ])} ):" ,
755
+ "" ,
756
+ content ,
751
757
).strip () # Remove action from content, AI often adds it by mistake event if explicitly asked not to
752
758
tool_calls = res .choices [0 ].message .tool_calls
753
759
@@ -806,9 +812,11 @@ async def gpt_chat(call: CallModel) -> ActionModel:
806
812
else :
807
813
content += parameters [customer_response_prop ] + " "
808
814
809
- call .claim = ClaimModel ()
810
- call .reminders = []
811
- model .content = "Claim and reminders created reset."
815
+ # Add context of the last message, if not, LLM messed up and loop on this action
816
+ last_message = call .messages [- 1 ]
817
+ call = CallModel (phone_number = call .phone_number )
818
+ call .messages .append (last_message )
819
+ model .content = "Claim, reminders and messages reset."
812
820
813
821
elif name == IndentAction .NEW_OR_UPDATED_REMINDER .value :
814
822
intent = IndentAction .NEW_OR_UPDATED_REMINDER
@@ -854,15 +862,21 @@ async def gpt_chat(call: CallModel) -> ActionModel:
854
862
)
855
863
)
856
864
857
- return ActionModel (
858
- content = content ,
859
- intent = intent ,
865
+ return (
866
+ call ,
867
+ ActionModel (
868
+ content = content ,
869
+ intent = intent ,
870
+ ),
860
871
)
861
872
862
873
except Exception :
863
874
_logger .warn (f"OpenAI API call error" , exc_info = True )
864
875
865
- return ActionModel (content = TTSPrompt .ERROR , intent = IndentAction .CONTINUE )
876
+ return (
877
+ call ,
878
+ ActionModel (content = TTSPrompt .ERROR , intent = IndentAction .CONTINUE ),
879
+ )
866
880
867
881
868
882
async def handle_recognize_text (
@@ -1015,7 +1029,7 @@ async def callback_url(caller_id: str) -> str:
1015
1029
if not call :
1016
1030
call = CallModel (phone_number = caller_id )
1017
1031
await db .call_aset (call )
1018
- return f"{ CALL_EVENT_URL } /{ call .call_id } "
1032
+ return f"{ CALL_EVENT_URL } /{ html . escape ( call .phone_number ) } "
1019
1033
1020
1034
1021
1035
async def post_call_synthesis (call : CallModel ) -> None :
0 commit comments