diff --git a/README.md b/README.md index 719489f..199b38f 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ | [![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/Azure-Samples/call-center-voice-agent-accelerator) | [![Open in Dev Containers](https://img.shields.io/static/v1?style=for-the-badge&label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/Azure-Samples/call-center-voice-agent-accelerator) |---|---| -Welcome to the *Call Center Real-time Voice Agent* solution accelerator. It's a lightweight template to create speech-to-speech voice agents that deliver personalized self-service experiences and natural-sounding voices, seamlessly integrated with telephony systems. This solution accelerator uses **Azure Voice Live API** and **Azure Communication Services** — Start locally, deploy later to Azure Web App. No PSTN number needed. +Welcome to the *Call Center Real-time Voice Agent* solution accelerator — a lightweight template for building speech-to-speech voice agents powered by **Azure Voice Live API**. It supports multiple telephony providers out of the box, including **Azure Communication Services (ACS)** and **Twilio**, plus a **web browser** client for quick testing. Bring your own telephony provider or use the built-in options. Start locally, deploy to Azure Container Apps. The Azure voice live API is a solution enabling low-latency, high-quality speech to speech interactions for voice agents. The API is designed for developers seeking scalable and efficient voice-driven experiences as it eliminates the need to manually orchestrate multiple components. By integrating speech recognition, generative AI, and text to speech functionalities into a single, unified interface, it provides an end-to-end solution for creating seamless experiences. Learn more about [Azure Voice Live API](https://learn.microsoft.com/azure/ai-services/speech-service/voice-live). @@ -26,7 +26,12 @@ This sample demonstrates how to build a real-time voice agent using the [Azure S The solution includes: - A backend service that connects to the **Voice Live API** for real-time ASR, LLM and TTS -- Two client options: **Web browser** (microphone/speaker) and **Azure Communication Services (ACS)** phone calls +- **Multiple client options:** The web browser client is always available. For telephony, choose **one** provider: + - **Web browser** — microphone/speaker via WebSocket (always available, great for testing) + - **Azure Communication Services (ACS)** — enterprise PSTN with Call Automation (default) + - **Twilio** — PSTN via Twilio Media Streams with webhook signature validation + + > **Telephony selection:** Only one telephony provider is active at a time. The server auto-detects based on which credentials are configured (e.g. `TWILIO_AUTH_TOKEN` present → Twilio, otherwise → ACS). - **Ambient Scenes** (optional): Add realistic background audio (office, call center) or use custom audio files to simulate real-world environments - Flexible configuration to customize prompts, ASR, TTS, and behavior - Easy extension to other client types such as [Audiohook](https://learn.microsoft.com/azure/ai-services/speech-service/how-to-use-audiohook) @@ -34,7 +39,7 @@ The solution includes: > You can also try the Voice Live API via [Azure AI Foundry](https://ai.azure.com/foundry) for quick experimentation before deploying this template to your own Azure subscription. ### Architecture diagram -|![Architecture Diagram](./docs/images/architecture_v0.0.2.png)| +|![Architecture Diagram](./docs/images/architecture_v0.0.3.png)| |---|
@@ -114,7 +119,7 @@ You can run this solution in VS Code Dev Containers, which will open the project
Deploy in your local environment - ### Local environment + ### Local Environment If you're not using one of the above options for opening the project, then you'll need to: @@ -176,7 +181,7 @@ To change the `azd` parameters from the default values, follow the steps [here]( After deployment, you can verify that your Voice Agent is running correctly using either the Web Client (for quick testing) or the ACS Phone Client (for simulating a real-world call center scenario). -🌐 Web Client (Test Mode) +### 🌐 Web Client (Test Mode) Use this browser-based client to confirm your Container App is up and responding. @@ -191,7 +196,7 @@ Use this browser-based client to confirm your Container App is up and responding -📞 ACS Client (Call Center Scenario) +### 📞 Telephony with ACS Client (Call Center Scenario) This simulates a real inbound phone call to your voice agent using **Azure Communication Services (ACS)**. @@ -229,8 +234,55 @@ Once your event subscription is configured and the phone number is active: - Dial the ACS number. - Your call will connect to the real-time voice agent powered by Azure Voice Live. +### 📞 Telephony with Twilio Client (Call Center Scenario) + +You can switch the telephony provider from ACS to **Twilio** by setting `TWILIO_AUTH_TOKEN`. When this token is configured, the server registers Twilio routes (`/voice` and `/twilio/ws`) instead of ACS routes. Inbound calls are handled via [Twilio Media Streams](https://www.twilio.com/docs/voice/media-streams) — the server validates the request, connects the caller's audio to the AI agent via a real-time WebSocket, and bridges it to Azure Voice Live. + +> To switch back to ACS, simply remove or unset `TWILIO_AUTH_TOKEN` and redeploy. + +**Prerequisites:** +- A [Twilio account](https://www.twilio.com/try-twilio) with a phone number +- Your **Twilio Auth Token** (found in the [Twilio Console](https://www.twilio.com/console)) + +**Setup with `azd`:** + +```bash +# Set your Twilio Auth Token before deploying +azd env set TWILIO_AUTH_TOKEN + +# Then deploy: +azd up +# or redeploy: +azd provision +azd deploy +``` + +The token is stored securely in Azure Key Vault and injected into the Container App as a secret reference. + +**Configure Twilio Webhook:** + +1. In the [Twilio Console](https://console.twilio.com), go to your phone number's configuration. +2. Under **PhoneNumber → A Call Comes In**, set: + - **Webhook URL:** `https:///voice` + - **HTTP Method:** `POST` +3. Save changes. + +**What happens when a call comes in:** +1. Twilio sends a request to `/voice` — the server validates it and returns instructions (TwiML) to start a media stream +2. Twilio opens a WebSocket to `/twilio/ws` — the server verifies the embedded token, then bridges the caller's audio to Azure Voice Live in real time +3. The AI agent hears the caller, generates a response, and the audio is streamed back through the same connection + +**Local development:** + +For local testing, set `TWILIO_AUTH_TOKEN` in your `.env` file: +``` +TWILIO_AUTH_TOKEN=your_auth_token_here +``` + +> **Note:** `TWILIO_AUTH_TOKEN` is required for both local and deployed environments. Without it, incoming calls will be rejected. -#### Local execution +--- +### Local Execution Once the environment has been deployed with `azd up` you can also run the application locally. diff --git a/azure.yaml b/azure.yaml index e994256..0e85375 100644 --- a/azure.yaml +++ b/azure.yaml @@ -17,4 +17,5 @@ pipeline: - AZURE_RESOURCE_GROUP - AZURE_CONTAINER_REGISTRY_ENDPOINT - SERVICE_API_ENDPOINTS - - AZURE_VOICE_LIVE_ENDPOINT \ No newline at end of file + - AZURE_VOICE_LIVE_ENDPOINT + - TWILIO_AUTH_TOKEN \ No newline at end of file diff --git a/docs/images/architecture_v0.0.3.png b/docs/images/architecture_v0.0.3.png new file mode 100644 index 0000000..de88bc1 Binary files /dev/null and b/docs/images/architecture_v0.0.3.png differ diff --git a/infra/main.bicep b/infra/main.bicep index 03526b0..8468883 100644 --- a/infra/main.bicep +++ b/infra/main.bicep @@ -21,6 +21,11 @@ param appExists bool param modelName string = ' gpt-4o-mini' @description('Id of the user or app to assign application roles. If ommited will be generated from the user assigned identity.') param principalId string = '' +@secure() +@description('Twilio Auth Token for webhook signature validation') +param twilioAuthToken string = '' + +var useTwilio = !empty(twilioAuthToken) var uniqueSuffix = substring(uniqueString(subscription().id, environmentName), 0, 5) var tags = {'azd-env-name': environmentName } @@ -82,7 +87,7 @@ module aiServices 'modules/aiservices.bicep' = { dependsOn: [ appIdentity ] } -module acs 'modules/acs.bicep' = { +module acs 'modules/acs.bicep' = if (!useTwilio) { name: 'acs-deployment' scope: rg params: { @@ -101,9 +106,11 @@ module keyvault 'modules/keyvault.bicep' = { location: location keyVaultName: sanitizedKeyVaultName tags: tags - acsConnectionString: acs.outputs.acsConnectionString + #disable-next-line BCP327 + acsConnectionString: !useTwilio ? acs.outputs.acsConnectionString : '' + twilioAuthToken: twilioAuthToken } - dependsOn: [ appIdentity, acs ] + dependsOn: [ appIdentity ] } // Add role assignments @@ -133,6 +140,7 @@ module containerapp 'modules/containerapp.bicep' = { aiServicesEndpoint: aiServices.outputs.aiServicesEndpoint modelDeploymentName: modelName acsConnectionStringSecretUri: keyvault.outputs.acsConnectionStringUri + twilioAuthTokenSecretUri: keyvault.outputs.twilioAuthTokenUri logAnalyticsWorkspaceName: logAnalyticsName imageName: 'mcr.microsoft.com/azuredocs/containerapps-helloworld:latest' } @@ -148,6 +156,6 @@ output AZURE_USER_ASSIGNED_IDENTITY_ID string = appIdentity.outputs.identityId output AZURE_USER_ASSIGNED_IDENTITY_CLIENT_ID string = appIdentity.outputs.clientId output AZURE_CONTAINER_REGISTRY_ENDPOINT string = registry.outputs.loginServer -output SERVICE_API_ENDPOINTS array = ['${containerapp.outputs.containerAppFqdn}/acs/incomingcall'] +output SERVICE_API_ENDPOINTS array = !useTwilio ? ['https://${containerapp.outputs.containerAppFqdn}/acs/incomingcall'] : ['https://${containerapp.outputs.containerAppFqdn}/voice'] output AZURE_VOICE_LIVE_ENDPOINT string = aiServices.outputs.aiServicesEndpoint output AZURE_VOICE_LIVE_MODEL string = modelName diff --git a/infra/main.parameters.json b/infra/main.parameters.json index 40725e7..a20a5a7 100644 --- a/infra/main.parameters.json +++ b/infra/main.parameters.json @@ -13,6 +13,9 @@ }, "appExists": { "value": "${SERVICE_APP_RESOURCE_EXISTS=false}" + }, + "twilioAuthToken": { + "value": "${TWILIO_AUTH_TOKEN}" } } } diff --git a/infra/modules/containerapp.bicep b/infra/modules/containerapp.bicep index 7084b9c..1349daf 100644 --- a/infra/modules/containerapp.bicep +++ b/infra/modules/containerapp.bicep @@ -9,6 +9,7 @@ param containerRegistryName string param aiServicesEndpoint string param modelDeploymentName string param acsConnectionStringSecretUri string +param twilioAuthTokenSecretUri string = '' param logAnalyticsWorkspaceName string @description('The name of the container image') param imageName string = '' @@ -67,20 +68,28 @@ resource containerApp 'Microsoft.App/containerApps@2024-10-02-preview' = { identity: identityId } ] - secrets: [ - { - name: 'acs-connection-string' - keyVaultUrl: acsConnectionStringSecretUri + secrets: concat( + !empty(acsConnectionStringSecretUri) ? [ + { + name: 'acs-connection-string' + keyVaultUrl: acsConnectionStringSecretUri + identity: identityId + } + ] : [], + !empty(twilioAuthTokenSecretUri) ? [ + { + name: 'twilio-auth-token' + keyVaultUrl: twilioAuthTokenSecretUri identity: identityId } - ] + ] : []) } template: { containers: [ { name: 'main' image: !empty(imageName) ? imageName : 'mcr.microsoft.com/azuredocs/containerapps-helloworld:latest' - env: [ + env: concat([ { name: 'AZURE_VOICE_LIVE_ENDPOINT' value: aiServicesEndpoint @@ -93,15 +102,21 @@ resource containerApp 'Microsoft.App/containerApps@2024-10-02-preview' = { name: 'VOICE_LIVE_MODEL' value: modelDeploymentName } + { + name: 'DEBUG_MODE' + value: 'true' + } + ], !empty(acsConnectionStringSecretUri) ? [ { name: 'ACS_CONNECTION_STRING' secretRef: 'acs-connection-string' } + ] : [], !empty(twilioAuthTokenSecretUri) ? [ { - name: 'DEBUG_MODE' - value: 'true' + name: 'TWILIO_AUTH_TOKEN' + secretRef: 'twilio-auth-token' } - ] + ] : []) resources: { cpu: json('2.0') memory: '4.0Gi' diff --git a/infra/modules/keyvault.bicep b/infra/modules/keyvault.bicep index e9124c1..8901c33 100644 --- a/infra/modules/keyvault.bicep +++ b/infra/modules/keyvault.bicep @@ -3,6 +3,8 @@ param keyVaultName string param tags object @secure() param acsConnectionString string +@secure() +param twilioAuthToken string = '' var sanitizedKeyVaultName = take(toLower(replace(replace(replace(replace(keyVaultName, '--', '-'), '_', '-'), '[^a-zA-Z0-9-]', ''), '-$', '')), 24) @@ -25,7 +27,7 @@ resource keyVault 'Microsoft.KeyVault/vaults@2023-02-01' = { } -resource acsConnectionStringSecret 'Microsoft.KeyVault/vaults/secrets@2023-07-01' = { +resource acsConnectionStringSecret 'Microsoft.KeyVault/vaults/secrets@2023-07-01' = if (!empty(acsConnectionString)) { parent: keyVault name: 'ACS-CONNECTION-STRING' properties: { @@ -35,6 +37,15 @@ resource acsConnectionStringSecret 'Microsoft.KeyVault/vaults/secrets@2023-07-01 var keyVaultDnsSuffix = environment().suffixes.keyvaultDns -output acsConnectionStringUri string = 'https://${keyVault.name}${keyVaultDnsSuffix}/secrets/${acsConnectionStringSecret.name}' +resource twilioAuthTokenSecret 'Microsoft.KeyVault/vaults/secrets@2023-07-01' = if (!empty(twilioAuthToken)) { + parent: keyVault + name: 'TWILIO-AUTH-TOKEN' + properties: { + value: twilioAuthToken + } +} + +output acsConnectionStringUri string = !empty(acsConnectionString) ? 'https://${keyVault.name}${keyVaultDnsSuffix}/secrets/${acsConnectionStringSecret.name}' : '' +output twilioAuthTokenUri string = !empty(twilioAuthToken) ? 'https://${keyVault.name}${keyVaultDnsSuffix}/secrets/TWILIO-AUTH-TOKEN' : '' output keyVaultId string = keyVault.id output keyVaultName string = keyVault.name diff --git a/server/.env-sample.txt b/server/.env-sample.txt index dd4c705..de9c1c5 100644 --- a/server/.env-sample.txt +++ b/server/.env-sample.txt @@ -4,6 +4,7 @@ AZURE_VOICE_LIVE_ENDPOINT= VOICE_LIVE_MODEL=gpt-4o-mini ACS_CONNECTION_STRING= ACS_DEV_TUNNEL= +TWILIO_AUTH_TOKEN= # Ambient Scenes (Optional) # Adds realistic background audio to simulate real-world call environments diff --git a/server/app/handler/acs_media_handler.py b/server/app/handler/acs_media_handler.py index 4991eac..8cc18d4 100644 --- a/server/app/handler/acs_media_handler.py +++ b/server/app/handler/acs_media_handler.py @@ -1,375 +1,57 @@ -"""Handles media streaming to Azure Voice Live API via WebSocket.""" +"""Handles ACS (Azure Communication Services) clients via JSON-wrapped audio.""" -import asyncio import base64 import json import logging -import uuid -from typing import Optional -import numpy as np -from azure.identity.aio import ManagedIdentityCredential -from websockets.asyncio.client import connect as ws_connect -from websockets.typing import Data - -from .ambient_mixer import AmbientMixer +from .voicelive_media_handler import DEFAULT_CHUNK_SIZE, VoiceLiveMediaHandler logger = logging.getLogger(__name__) -# Default chunk size in bytes (100ms of audio at 24kHz, 16-bit mono) -DEFAULT_CHUNK_SIZE = 4800 # 24000 samples/sec * 0.1 sec * 2 bytes - - -def session_config(): - """Returns the default session configuration for Voice Live.""" - return { - "type": "session.update", - "session": { - "instructions": "You are a helpful AI assistant responding in natural, engaging language.", - "turn_detection": { - "type": "azure_semantic_vad", - "threshold": 0.3, - "prefix_padding_ms": 200, - "silence_duration_ms": 200, - "remove_filler_words": False, - "end_of_utterance_detection": { - "model": "semantic_detection_v1", - "threshold": 0.01, - "timeout": 2, - }, - }, - "input_audio_noise_reduction": {"type": "azure_deep_noise_suppression"}, - "input_audio_echo_cancellation": {"type": "server_echo_cancellation"}, - "voice": { - "name": "en-US-Aria:DragonHDLatestNeural", - "type": "azure-standard", - "temperature": 0.8, - }, - }, - } - - -class ACSMediaHandler: - """Manages audio streaming between client and Azure Voice Live API.""" - - def __init__(self, config): - self.endpoint = config["AZURE_VOICE_LIVE_ENDPOINT"] - self.model = config["VOICE_LIVE_MODEL"] - self.api_key = config["AZURE_VOICE_LIVE_API_KEY"] - self.client_id = config["AZURE_USER_ASSIGNED_IDENTITY_CLIENT_ID"] - self.send_queue = asyncio.Queue() - self.ws = None - self.send_task = None - self.incoming_websocket = None - self.is_raw_audio = True - - # TTS output buffering for continuous ambient mixing - self._tts_output_buffer = bytearray() - self._tts_buffer_lock = asyncio.Lock() - self._max_buffer_size = 480000 # 10 seconds of audio - large enough for long responses - self._buffer_warning_logged = False - self._tts_playback_started = False # Track if we've started playing TTS - self._min_buffer_to_start = 9600 # 200ms buffer before starting TTS playback - - # Ambient mixer initialization - self._ambient_mixer: Optional[AmbientMixer] = None - ambient_preset = config.get("AMBIENT_PRESET", "none") - if ambient_preset and ambient_preset != "none": - try: - self._ambient_mixer = AmbientMixer(preset=ambient_preset) - except Exception as e: - logger.error(f"Failed to initialize AmbientMixer: {e}") - - def _generate_guid(self): - return str(uuid.uuid4()) - - async def connect(self): - """Connects to Azure Voice Live API via WebSocket.""" - endpoint = self.endpoint.rstrip("/") - model = self.model.strip() - url = f"{endpoint}/voice-live/realtime?api-version=2025-05-01-preview&model={model}" - url = url.replace("https://", "wss://") - - headers = {"x-ms-client-request-id": self._generate_guid()} - - if self.client_id: - # Use async context manager to auto-close the credential - async with ManagedIdentityCredential(client_id=self.client_id) as credential: - token = await credential.get_token( - "https://cognitiveservices.azure.com/.default" - ) - headers["Authorization"] = f"Bearer {token.token}" - logger.info("[VoiceLiveACSHandler] Connected to Voice Live API by managed identity") - else: - headers["api-key"] = self.api_key - - self.ws = await ws_connect(url, additional_headers=headers) - logger.info("[VoiceLiveACSHandler] Connected to Voice Live API") - - await self._send_json(session_config()) - await self._send_json({"type": "response.create"}) - - asyncio.create_task(self._receiver_loop()) - self.send_task = asyncio.create_task(self._sender_loop()) - - async def init_incoming_websocket(self, socket, is_raw_audio=True): - """Sets up incoming ACS WebSocket.""" - self.incoming_websocket = socket - self.is_raw_audio = is_raw_audio - - async def audio_to_voicelive(self, audio_b64: str): - """Queues audio data to be sent to Voice Live API.""" - await self.send_queue.put( - json.dumps({"type": "input_audio_buffer.append", "audio": audio_b64}) - ) - - async def _send_json(self, obj): - """Sends a JSON object over WebSocket.""" - if self.ws: - await self.ws.send(json.dumps(obj)) - - async def _sender_loop(self): - """Continuously sends messages from the queue to the Voice Live WebSocket.""" - try: - while True: - msg = await self.send_queue.get() - if self.ws: - await self.ws.send(msg) - except Exception: - logger.exception("[VoiceLiveACSHandler] Sender loop error") - - async def _receiver_loop(self): - """Handles incoming events from the Voice Live WebSocket.""" - try: - async for message in self.ws: - event = json.loads(message) - event_type = event.get("type") - - match event_type: - case "session.created": - session_id = event.get("session", {}).get("id") - logger.info("[VoiceLiveACSHandler] Session ID: %s", session_id) - - case "input_audio_buffer.cleared": - logger.info("Input Audio Buffer Cleared Message") - case "input_audio_buffer.speech_started": - logger.info( - "Voice activity detection started at %s ms", - event.get("audio_start_ms"), - ) - await self.stop_audio() +class ACSMediaHandler(VoiceLiveMediaHandler): + """Bridges ACS Call Automation WebSocket to Voice Live. - case "input_audio_buffer.speech_stopped": - logger.info("Speech stopped") + Overrides only the JSON wrapping/unwrapping; ambient mixing and Voice Live + connection are inherited from the base class. + """ - case "conversation.item.input_audio_transcription.completed": - transcript = event.get("transcript") - logger.info("User: %s", transcript) + # ------------------------------------------------------------------ + # Audio output — wrap in ACS JSON protocol + # ------------------------------------------------------------------ - case "conversation.item.input_audio_transcription.failed": - error_msg = event.get("error") - logger.warning("Transcription Error: %s", error_msg) - - case "response.done": - response = event.get("response", {}) - logger.info("Response Done: Id=%s", response.get("id")) - if response.get("status_details"): - logger.info( - "Status Details: %s", - json.dumps(response["status_details"], indent=2), - ) - - case "response.audio_transcript.done": - transcript = event.get("transcript") - logger.info("AI: %s", transcript) - await self.send_message( - json.dumps({"Kind": "Transcription", "Text": transcript}) - ) - - case "response.audio.delta": - delta = event.get("delta") - audio_bytes = base64.b64decode(delta) - - # Check if ambient mixing is enabled - if self._ambient_mixer is not None and self._ambient_mixer.is_enabled(): - # Buffer TTS for continuous output mixing - async with self._tts_buffer_lock: - self._tts_output_buffer.extend(audio_bytes) - # Warn if buffer is getting large, but NEVER drop audio - if len(self._tts_output_buffer) > self._max_buffer_size: - if not self._buffer_warning_logged: - logger.warning( - f"TTS buffer large: {len(self._tts_output_buffer)} bytes. " - "Speech may be delayed but will not be cut." - ) - self._buffer_warning_logged = True - elif self._buffer_warning_logged and len(self._tts_output_buffer) < self._max_buffer_size // 2: - self._buffer_warning_logged = False # Reset warning flag - else: - # No ambient - send immediately (original behavior) - if self.is_raw_audio: - await self.send_message(audio_bytes) - else: - await self.voicelive_to_acs(delta) - - case "error": - logger.error("Voice Live Error: %s", event) - - case _: - logger.debug( - "[VoiceLiveACSHandler] Other event: %s", event_type - ) - except Exception: - logger.exception("[VoiceLiveACSHandler] Receiver loop error") - - async def send_message(self, message: Data): - """Sends data back to client WebSocket.""" - try: - await self.incoming_websocket.send(message) - except Exception: - logger.exception("[VoiceLiveACSHandler] Failed to send message") - - async def voicelive_to_acs(self, base64_data): - """Converts Voice Live audio delta to ACS audio message.""" - try: - data = { - "Kind": "AudioData", - "AudioData": {"Data": base64_data}, - "StopAudio": None, - } - await self.send_message(json.dumps(data)) - except Exception: - logger.exception("[VoiceLiveACSHandler] Error in voicelive_to_acs") - - async def stop_audio(self): - """Sends a StopAudio signal to ACS.""" - stop_audio_data = {"Kind": "StopAudio", "AudioData": None, "StopAudio": {}} - await self.send_message(json.dumps(stop_audio_data)) - - # Clear TTS buffer when user starts speaking - if self._ambient_mixer is not None: - async with self._tts_buffer_lock: - self._tts_output_buffer.clear() - self._tts_playback_started = False - - async def _send_continuous_audio(self, chunk_size: int) -> None: - """ - Send continuous audio (ambient + TTS if available) back to client. - - Called for every incoming audio frame, ensuring continuous output. - Uses buffered TTS with minimum buffer threshold to prevent mid-word cuts. - - Args: - chunk_size: Size of audio chunk to send (matches incoming frame size) - """ - if self._ambient_mixer is None or not self._ambient_mixer.is_enabled(): - return # Ambient disabled, skip - - try: - async with self._tts_buffer_lock: - buffer_len = len(self._tts_output_buffer) - - # Always get a consistent ambient chunk first - ambient_bytes = self._ambient_mixer.get_ambient_only_chunk(chunk_size) - - # Determine if we should play TTS - should_play_tts = False - if self._tts_playback_started: - # Already playing - continue until buffer empty - if buffer_len >= chunk_size: - should_play_tts = True - elif buffer_len > 0: - # Partial buffer but still playing - use what we have - should_play_tts = True - else: - # Buffer empty - stop playback mode - self._tts_playback_started = False - else: - # Not yet playing - wait for minimum buffer - if buffer_len >= self._min_buffer_to_start: - self._tts_playback_started = True - should_play_tts = True - - if should_play_tts and buffer_len >= chunk_size: - # Full TTS chunk available - add TTS on top of ambient - tts_chunk = bytes(self._tts_output_buffer[:chunk_size]) - del self._tts_output_buffer[:chunk_size] - - # Mix: ambient (constant) + TTS - ambient = np.frombuffer(ambient_bytes, dtype=np.int16).astype(np.float32) / 32768.0 - tts = np.frombuffer(tts_chunk, dtype=np.int16).astype(np.float32) / 32768.0 - mixed = ambient + tts - mixed = np.clip(mixed, -0.95, 0.95) # Soft limit - output_bytes = (mixed * 32767).astype(np.int16).tobytes() - - elif should_play_tts and buffer_len > 0: - # Partial TTS remaining at end of speech - drain it - tts_chunk = bytes(self._tts_output_buffer[:]) - self._tts_output_buffer.clear() - self._tts_playback_started = False - - ambient = np.frombuffer(ambient_bytes, dtype=np.int16).astype(np.float32) / 32768.0 - - # Only mix TTS for the portion we have - tts_samples = len(tts_chunk) // 2 - tts = np.frombuffer(tts_chunk, dtype=np.int16).astype(np.float32) / 32768.0 - ambient[:tts_samples] += tts - mixed = np.clip(ambient, -0.95, 0.95) - output_bytes = (mixed * 32767).astype(np.int16).tobytes() - - else: - # No TTS ready - just send constant ambient - output_bytes = ambient_bytes - - # Send to client - if self.is_raw_audio: - # Web browser - raw bytes - await self.send_message(output_bytes) - else: - # Phone call - JSON wrapped - output_b64 = base64.b64encode(output_bytes).decode("ascii") - data = { - "Kind": "AudioData", - "AudioData": {"Data": output_b64}, - "StopAudio": None, - } - await self.send_message(json.dumps(data)) - - except Exception: - logger.exception("[VoiceLiveACSHandler] Error in _send_continuous_audio") - - async def acs_to_voicelive(self, stream_data): - """Processes audio from ACS and forwards to Voice Live if not silent.""" + async def _send_audio_to_client(self, audio_bytes: bytes): + """Wrap audio in ACS AudioData JSON format before sending.""" + audio_b64 = base64.b64encode(audio_bytes).decode("ascii") + data = { + "Kind": "AudioData", + "AudioData": {"Data": audio_b64}, + "StopAudio": None, + } + await self.send_message(json.dumps(data)) + + # ------------------------------------------------------------------ + # Inbound audio — parse ACS JSON protocol + # ------------------------------------------------------------------ + + def _receive_audio_from_client(self, data) -> tuple: + """Parse ACS JSON and extract PCM audio bytes.""" try: - data = json.loads(stream_data) - if data.get("kind") == "AudioData": - audio_data = data.get("audioData", {}) + msg = json.loads(data) + if msg.get("kind") == "AudioData": + audio_data = msg.get("audioData", {}) incoming_data = audio_data.get("data", "") - - # Determine chunk size from incoming audio + if incoming_data: - incoming_bytes = base64.b64decode(incoming_data) - chunk_size = len(incoming_bytes) + pcm_bytes = base64.b64decode(incoming_data) + chunk_size = len(pcm_bytes) else: + pcm_bytes = None chunk_size = DEFAULT_CHUNK_SIZE - - # Send continuous audio back to caller (ambient + TTS mixed) - await self._send_continuous_audio(chunk_size) - - # Forward non-silent audio to Voice Live (existing logic) - if not audio_data.get("silent", True): - await self.audio_to_voicelive(audio_data.get("data")) - except Exception: - logger.exception("[VoiceLiveACSHandler] Error processing ACS audio") - async def web_to_voicelive(self, audio_bytes): - """Encodes raw audio bytes and sends to Voice Live API.""" - chunk_size = len(audio_bytes) - - # Send continuous audio back to browser (ambient + TTS mixed) - await self._send_continuous_audio(chunk_size) - - # Forward to Voice Live - audio_b64 = base64.b64encode(audio_bytes).decode("ascii") - await self.audio_to_voicelive(audio_b64) + if audio_data.get("silent", True): + return None, chunk_size + return pcm_bytes, chunk_size + except Exception: + logger.exception("[ACSMediaHandler] Error parsing ACS audio") + return None, DEFAULT_CHUNK_SIZE diff --git a/server/app/handler/twilio_event_handler.py b/server/app/handler/twilio_event_handler.py new file mode 100644 index 0000000..3c95303 --- /dev/null +++ b/server/app/handler/twilio_event_handler.py @@ -0,0 +1,78 @@ +"""Handler for Twilio webhook validation and incoming call TwiML generation.""" + +import hashlib +import hmac +import logging +import time +from urllib.parse import urlparse, urlunparse + +from twilio.request_validator import RequestValidator +from twilio.twiml.voice_response import VoiceResponse + +logger = logging.getLogger(__name__) + +# Token validity period in seconds +_TOKEN_TTL = 60 + + +class TwilioEventHandler: + """Validates Twilio webhook signatures and generates TwiML responses.""" + + def __init__(self, config): + self.auth_token = config.get("TWILIO_AUTH_TOKEN", "") + + def _reconstruct_url(self, raw_url: str) -> str: + """Reconstruct URL as Twilio sees it (https, no port for voice HTTPS).""" + parsed = urlparse(raw_url) + return urlunparse(("https", parsed.hostname, parsed.path, parsed.params, parsed.query, "")) + + def _generate_ws_token(self) -> str: + """Generate a short-lived HMAC token for WebSocket authentication.""" + timestamp = str(int(time.time())) + sig = hmac.new( + self.auth_token.encode(), timestamp.encode(), hashlib.sha256 + ).hexdigest() + return f"{timestamp}.{sig}" + + def verify_ws_token(self, token: str) -> bool: + """Verify a WebSocket token is valid and not expired.""" + if not self.auth_token or not token: + return False + parts = token.split(".", 1) + if len(parts) != 2: + return False + timestamp_str, sig = parts + try: + timestamp = int(timestamp_str) + except ValueError: + return False + # Check expiry + if time.time() - timestamp > _TOKEN_TTL: + return False + # Verify signature + expected = hmac.new( + self.auth_token.encode(), timestamp_str.encode(), hashlib.sha256 + ).hexdigest() + return hmac.compare_digest(sig, expected) + + def validate_request(self, url: str, params: dict, signature: str) -> bool: + """Validate a Twilio HTTP webhook request signature. + + Returns True if valid, False if invalid, None if auth token not configured. + """ + if not self.auth_token: + return None + validator = RequestValidator(self.auth_token) + reconstructed_url = self._reconstruct_url(url) + return validator.validate(reconstructed_url, params, signature) + + def generate_stream_twiml(self, ws_url: str) -> str: + """Generate TwiML response that connects the call to a media stream with auth token.""" + token = self._generate_ws_token() + resp = VoiceResponse() + resp.say("Please wait while we connect you to our AI assistant.") + connect = resp.connect() + stream = connect.stream(url=ws_url) + stream.parameter(name="token", value=token) + logger.info("Returning TwiML with stream URL: %s", ws_url) + return str(resp) diff --git a/server/app/handler/twilio_media_handler.py b/server/app/handler/twilio_media_handler.py new file mode 100644 index 0000000..b935f85 --- /dev/null +++ b/server/app/handler/twilio_media_handler.py @@ -0,0 +1,194 @@ +"""Handles Twilio Media Stream WebSocket and bridges audio to Azure Voice Live API.""" + +import audioop +import base64 +import hashlib +import hmac +import json +import logging +import time + +from .voicelive_media_handler import VoiceLiveMediaHandler + +logger = logging.getLogger(__name__) + +# Twilio sends mulaw 8000Hz; Voice Live expects PCM 24000Hz 16-bit mono. +TWILIO_SAMPLE_RATE = 8000 +VOICELIVE_SAMPLE_RATE = 24000 +_TOKEN_TTL = 60 + + +class TwilioMediaHandler(VoiceLiveMediaHandler): + """Bridges Twilio Media Stream WebSocket to Azure Voice Live API. + + Handles mulaw/PCM conversion, rate resampling, and Twilio protocol. + """ + + def __init__(self, config): + super().__init__(config) + self.auth_token = config.get("TWILIO_AUTH_TOKEN", "") + self.twilio_ws = None + self.stream_sid = None + self._ratecv_state_in = None + self._ratecv_state_out = None + + # ------------------------------------------------------------------ + # Authentication + # ------------------------------------------------------------------ + + def _verify_ws_token(self, token: str) -> bool: + """Verify a WebSocket token is valid and not expired.""" + if not self.auth_token or not token: + return False + parts = token.split(".", 1) + if len(parts) != 2: + return False + timestamp_str, sig = parts + try: + timestamp = int(timestamp_str) + except ValueError: + return False + if time.time() - timestamp > _TOKEN_TTL: + return False + expected = hmac.new( + self.auth_token.encode(), timestamp_str.encode(), hashlib.sha256 + ).hexdigest() + return hmac.compare_digest(sig, expected) + + async def authenticate_and_start(self) -> bool: + """Wait for the Twilio 'start' message and validate the embedded token. + + Returns True if authenticated, False if rejected (WebSocket already closed). + """ + while True: + msg = await self.twilio_ws.receive() + data = json.loads(msg) + event = data.get("event") + + if event == "connected": + logger.info("[TwilioMediaHandler] Twilio connected: protocol=%s", data.get("protocol")) + continue + + if event == "start": + custom_params = data.get("start", {}).get("customParameters", {}) + token = custom_params.get("token", "") + if not self._verify_ws_token(token): + logger.warning("[TwilioMediaHandler] Invalid or expired stream token") + await self.twilio_ws.close(4403, "Forbidden") + return False + # Process the start message + await self.handle_twilio_message(msg) + return True + + # Unexpected message before start + logger.warning("[TwilioMediaHandler] Unexpected message before start: %s", event) + await self.twilio_ws.close(4400, "Bad Request") + return False + + # ------------------------------------------------------------------ + # Voice Live hooks + # ------------------------------------------------------------------ + + async def on_speech_started(self): + """Barge-in: clear Twilio playback and TTS buffer.""" + await self._send_clear_to_twilio() + if self._ambient_mixer is not None: + async with self._tts_buffer_lock: + self._tts_output_buffer.clear() + self._tts_playback_started = False + + async def on_transcript_done(self, transcript: str): + """Log only — Twilio has no transcript channel.""" + pass + + # ------------------------------------------------------------------ + # Audio output to client — PCM 24kHz → mulaw 8kHz → Twilio + # ------------------------------------------------------------------ + + async def _send_audio_to_client(self, audio_bytes: bytes): + """Convert PCM 24kHz to mulaw 8kHz and send to Twilio.""" + if not self.twilio_ws or not self.stream_sid: + return + + pcm_8k, self._ratecv_state_out = audioop.ratecv( + audio_bytes, 2, 1, VOICELIVE_SAMPLE_RATE, TWILIO_SAMPLE_RATE, self._ratecv_state_out + ) + + mulaw_bytes = audioop.lin2ulaw(pcm_8k, 2) + mulaw_b64 = base64.b64encode(mulaw_bytes).decode("ascii") + + msg = { + "event": "media", + "streamSid": self.stream_sid, + "media": {"payload": mulaw_b64}, + } + await self.twilio_ws.send(json.dumps(msg)) + + # ------------------------------------------------------------------ + # Twilio message handling + # ------------------------------------------------------------------ + + async def handle_twilio_message(self, message: str): + """Processes an incoming Twilio WebSocket message.""" + try: + data = json.loads(message) + except json.JSONDecodeError: + logger.warning("[TwilioMediaHandler] Non-JSON message received") + return + + event = data.get("event") + + match event: + case "connected": + logger.info("[TwilioMediaHandler] Twilio connected: protocol=%s", data.get("protocol")) + + case "start": + self.stream_sid = data.get("streamSid") + start_info = data.get("start", {}) + logger.info( + "[TwilioMediaHandler] Stream started: sid=%s, call=%s, format=%s", + self.stream_sid, + start_info.get("callSid"), + start_info.get("mediaFormat"), + ) + + case "media": + media = data.get("media", {}) + payload = media.get("payload", "") + if payload: + mulaw_bytes = base64.b64decode(payload) + await self.handle_audio(mulaw_bytes) + + case "stop": + logger.info("[TwilioMediaHandler] Stream stopped: sid=%s", self.stream_sid) + + case "dtmf": + digit = data.get("dtmf", {}).get("digit") + logger.info("[TwilioMediaHandler] DTMF received: %s", digit) + + case "mark": + mark_name = data.get("mark", {}).get("name") + logger.debug("[TwilioMediaHandler] Mark received: %s", mark_name) + + case _: + logger.debug("[TwilioMediaHandler] Unknown event: %s", event) + + # ------------------------------------------------------------------ + # Inbound audio — mulaw 8kHz → PCM 24kHz + # ------------------------------------------------------------------ + + def _receive_audio_from_client(self, data) -> tuple: + """Convert Twilio mulaw/8kHz bytes to PCM 24kHz.""" + pcm_8k = audioop.ulaw2lin(data, 2) + pcm_24k, self._ratecv_state_in = audioop.ratecv( + pcm_8k, 2, 1, TWILIO_SAMPLE_RATE, VOICELIVE_SAMPLE_RATE, self._ratecv_state_in + ) + return pcm_24k, len(pcm_24k) + + async def _send_clear_to_twilio(self): + """Sends a clear message to Twilio to stop current audio playback.""" + if not self.twilio_ws or not self.stream_sid: + return + self._ratecv_state_out = None + msg = {"event": "clear", "streamSid": self.stream_sid} + await self.twilio_ws.send(json.dumps(msg)) diff --git a/server/app/handler/voicelive_media_handler.py b/server/app/handler/voicelive_media_handler.py new file mode 100644 index 0000000..e35e77e --- /dev/null +++ b/server/app/handler/voicelive_media_handler.py @@ -0,0 +1,368 @@ +"""Base handler for Azure Voice Live API WebSocket connections. + +Provides the shared Voice Live connection, sender/receiver loops, web client +audio handling with ambient mixing, and cleanup logic. Telephony subclasses +override hooks and handle_audio() to implement protocol-specific behavior. +""" + +import asyncio +import base64 +import json +import logging +import uuid +from typing import Optional + +import numpy as np +from azure.identity.aio import ManagedIdentityCredential +from websockets.asyncio.client import connect as ws_connect +from websockets.typing import Data + +from .ambient_mixer import AmbientMixer + +logger = logging.getLogger(__name__) + +# Default chunk size in bytes (100ms of audio at 24kHz, 16-bit mono) +DEFAULT_CHUNK_SIZE = 4800 # 24000 samples/sec * 0.1 sec * 2 bytes + + +class VoiceLiveMediaHandler: + """Handles the WebSocket connection to Azure Voice Live API and web clients. + + Provides web client audio handling (raw PCM + ambient mixing) by default. + Telephony subclasses (ACS, Twilio) override hooks for their specific protocols. + """ + + _api_version = "2026-01-01-preview" + + def __init__(self, config): + self.endpoint = config["AZURE_VOICE_LIVE_ENDPOINT"] + self.model = config["VOICE_LIVE_MODEL"] + self.api_key = config["AZURE_VOICE_LIVE_API_KEY"] + self.client_id = config["AZURE_USER_ASSIGNED_IDENTITY_CLIENT_ID"] + self.ws = None + self._send_queue = asyncio.Queue() + self._send_task = None + self._receiver_task = None + + # Client WebSocket + self.client_ws = None + + # TTS output buffering for continuous ambient mixing + self._tts_output_buffer = bytearray() + self._tts_buffer_lock = asyncio.Lock() + self._max_buffer_size = 480000 # 10 seconds of audio + self._buffer_warning_logged = False + self._tts_playback_started = False + self._min_buffer_to_start = 9600 # 200ms buffer before starting TTS playback + + # Ambient mixer initialization + self._ambient_mixer: Optional[AmbientMixer] = None + ambient_preset = config.get("AMBIENT_PRESET", "none") + if ambient_preset and ambient_preset != "none": + try: + self._ambient_mixer = AmbientMixer(preset=ambient_preset) + except Exception as e: + logger.error(f"Failed to initialize AmbientMixer: {e}") + + def _session_config(self): + """Return the session configuration to send on connect.""" + return { + "type": "session.update", + "session": { + "instructions": "You are a helpful AI assistant responding in natural, engaging language.", + "turn_detection": { + "type": "azure_semantic_vad", + }, + "input_audio_noise_reduction": {"type": "azure_deep_noise_suppression"}, + "input_audio_echo_cancellation": {"type": "server_echo_cancellation"}, + "voice": { + "name": "en-US-Aria:DragonHDLatestNeural", + "type": "azure-standard", + "temperature": 0.8, + }, + }, + } + + # ------------------------------------------------------------------ + # Voice Live connection + # ------------------------------------------------------------------ + + async def connect_voicelive(self): + """Connect to Azure Voice Live API via WebSocket.""" + endpoint = self.endpoint.rstrip("/") + model = self.model.strip() + url = ( + f"{endpoint}/voice-live/realtime" + f"?api-version={self._api_version}&model={model}" + ) + url = url.replace("https://", "wss://") + + headers = {"x-ms-client-request-id": str(uuid.uuid4())} + + if self.client_id: + async with ManagedIdentityCredential(client_id=self.client_id) as credential: + token = await credential.get_token( + "https://cognitiveservices.azure.com/.default" + ) + headers["Authorization"] = f"Bearer {token.token}" + else: + headers["api-key"] = self.api_key + + self.ws = await ws_connect(url, additional_headers=headers) + logger.info("[VoiceLive] Connected to Voice Live API") + + await self._send_json(self._session_config()) + await self._send_json({"type": "response.create"}) + + self._receiver_task = asyncio.create_task(self._receiver_loop()) + self._send_task = asyncio.create_task(self._sender_loop()) + + async def send_audio(self, audio_b64: str): + """Queue PCM 24kHz 16-bit mono audio (base64) to send to Voice Live.""" + await self._send_queue.put( + json.dumps({"type": "input_audio_buffer.append", "audio": audio_b64}) + ) + + async def _send_json(self, obj): + """Send a JSON object directly to the Voice Live WebSocket.""" + if self.ws: + await self.ws.send(json.dumps(obj)) + + async def _sender_loop(self): + """Continuously sends queued messages to Voice Live.""" + try: + while True: + msg = await self._send_queue.get() + if self.ws: + await self.ws.send(msg) + except asyncio.CancelledError: + pass + except Exception: + logger.exception("[VoiceLive] Sender loop error") + + async def _receiver_loop(self): + """Receives events from Voice Live and dispatches to hook methods.""" + try: + async for message in self.ws: + try: + event = json.loads(message) + except json.JSONDecodeError: + continue + + event_type = event.get("type", "") + + match event_type: + case "session.created": + session_id = event.get("session", {}).get("id") + logger.info("[VoiceLive] Session ID: %s", session_id) + + case "session.updated": + logger.info("[VoiceLive] Session updated") + + case "input_audio_buffer.cleared": + logger.debug("[VoiceLive] Input audio buffer cleared") + + case "input_audio_buffer.speech_started": + logger.info( + "[VoiceLive] Speech started at %s ms", + event.get("audio_start_ms"), + ) + await self.on_speech_started() + + case "input_audio_buffer.speech_stopped": + logger.info("[VoiceLive] Speech stopped") + + case "conversation.item.input_audio_transcription.completed": + transcript = event.get("transcript") + logger.debug("[VoiceLive] User: %s", transcript) + + case "conversation.item.input_audio_transcription.failed": + logger.warning( + "[VoiceLive] Transcription error: %s", event.get("error") + ) + + case "response.audio.delta": + delta = event.get("delta", "") + if delta: + await self.on_audio_delta(delta) + + case "response.audio_transcript.done": + transcript = event.get("transcript") + logger.debug("[VoiceLive] AI: %s", transcript) + await self.on_transcript_done(transcript) + + case "response.done": + response = event.get("response", {}) + logger.info( + "[VoiceLive] Response done: id=%s", response.get("id") + ) + + case "error": + logger.error("[VoiceLive] Error: %s", event.get("error")) + + case _: + logger.debug("[VoiceLive] Event: %s", event_type) + except asyncio.CancelledError: + pass + except Exception: + logger.exception("[VoiceLive] Receiver loop error") + + # ------------------------------------------------------------------ + # Client WebSocket + # ------------------------------------------------------------------ + + async def init_websocket(self, socket): + """Sets up the client WebSocket.""" + self.client_ws = socket + + async def send_message(self, message: Data): + """Sends data back to client WebSocket.""" + try: + await self.client_ws.send(message) + except Exception: + logger.exception("[VoiceLive] Failed to send message to client") + + # ------------------------------------------------------------------ + # Hooks — web client implementations (override in telephony subclasses) + # ------------------------------------------------------------------ + + async def on_speech_started(self): + """Barge-in: send StopAudio to client and clear TTS buffer.""" + stop_audio_data = {"Kind": "StopAudio", "AudioData": None, "StopAudio": {}} + await self.send_message(json.dumps(stop_audio_data)) + + if self._ambient_mixer is not None: + async with self._tts_buffer_lock: + self._tts_output_buffer.clear() + self._tts_playback_started = False + + async def on_audio_delta(self, audio_b64: str): + """Handle audio from Voice Live — buffer for ambient or send directly.""" + audio_bytes = base64.b64decode(audio_b64) + + if self._ambient_mixer is not None and self._ambient_mixer.is_enabled(): + async with self._tts_buffer_lock: + self._tts_output_buffer.extend(audio_bytes) + if len(self._tts_output_buffer) > self._max_buffer_size: + if not self._buffer_warning_logged: + logger.warning( + f"TTS buffer large: {len(self._tts_output_buffer)} bytes. " + "Speech may be delayed but will not be cut." + ) + self._buffer_warning_logged = True + elif self._buffer_warning_logged and len(self._tts_output_buffer) < self._max_buffer_size // 2: + self._buffer_warning_logged = False + else: + await self._send_audio_to_client(audio_bytes) + + async def on_transcript_done(self, transcript: str): + """Forward transcript to client.""" + await self.send_message( + json.dumps({"Kind": "Transcription", "Text": transcript}) + ) + + # ------------------------------------------------------------------ + # Audio output to client + # ------------------------------------------------------------------ + + async def _send_audio_to_client(self, audio_bytes: bytes): + """Send audio bytes to the client. Override in subclasses for wrapping.""" + await self.send_message(audio_bytes) + + # ------------------------------------------------------------------ + # Inbound audio from client + # ------------------------------------------------------------------ + + def _receive_audio_from_client(self, data) -> tuple: + """Convert client audio to PCM 24kHz. Override for format conversion. + + Returns (pcm_bytes | None, chunk_size). Return None for silent frames. + """ + return data, len(data) + + async def handle_audio(self, data): + """Process inbound audio: convert, mix ambient, forward to Voice Live.""" + pcm_bytes, chunk_size = self._receive_audio_from_client(data) + await self._send_continuous_audio(chunk_size) + if pcm_bytes: + audio_b64 = base64.b64encode(pcm_bytes).decode("ascii") + await self.send_audio(audio_b64) + + # ------------------------------------------------------------------ + # Ambient mixing + # ------------------------------------------------------------------ + + async def _send_continuous_audio(self, chunk_size: int) -> None: + """Send continuous audio (ambient + TTS if available) back to client.""" + if self._ambient_mixer is None or not self._ambient_mixer.is_enabled(): + return + + try: + async with self._tts_buffer_lock: + buffer_len = len(self._tts_output_buffer) + ambient_bytes = self._ambient_mixer.get_ambient_only_chunk(chunk_size) + + should_play_tts = False + if self._tts_playback_started: + if buffer_len >= chunk_size: + should_play_tts = True + elif buffer_len > 0: + should_play_tts = True + else: + self._tts_playback_started = False + else: + if buffer_len >= self._min_buffer_to_start: + self._tts_playback_started = True + should_play_tts = True + + if should_play_tts and buffer_len >= chunk_size: + tts_chunk = bytes(self._tts_output_buffer[:chunk_size]) + del self._tts_output_buffer[:chunk_size] + + ambient = np.frombuffer(ambient_bytes, dtype=np.int16).astype(np.float32) / 32768.0 + tts = np.frombuffer(tts_chunk, dtype=np.int16).astype(np.float32) / 32768.0 + mixed = np.clip(ambient + tts, -0.95, 0.95) + output_bytes = (mixed * 32767).astype(np.int16).tobytes() + + elif should_play_tts and buffer_len > 0: + tts_chunk = bytes(self._tts_output_buffer[:]) + self._tts_output_buffer.clear() + self._tts_playback_started = False + + ambient = np.frombuffer(ambient_bytes, dtype=np.int16).astype(np.float32) / 32768.0 + tts_samples = len(tts_chunk) // 2 + tts = np.frombuffer(tts_chunk, dtype=np.int16).astype(np.float32) / 32768.0 + ambient[:tts_samples] += tts + mixed = np.clip(ambient, -0.95, 0.95) + output_bytes = (mixed * 32767).astype(np.int16).tobytes() + + else: + output_bytes = ambient_bytes + + await self._send_audio_to_client(output_bytes) + + except Exception: + logger.exception("[VoiceLive] Error in _send_continuous_audio") + + # ------------------------------------------------------------------ + # Cleanup + # ------------------------------------------------------------------ + + async def _cleanup(self): + """Cancel background tasks and close the Voice Live WebSocket.""" + for task in (self._receiver_task, self._send_task): + if task: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + self._receiver_task = None + self._send_task = None + if self.ws: + try: + await self.ws.close() + except Exception: + pass + self.ws = None + logger.info("[VoiceLive] Cleaned up") diff --git a/server/pyproject.toml b/server/pyproject.toml index fa11171..1a8368c 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -6,21 +6,21 @@ requires-python = ">=3.9" dependencies = [ "quart>=0.20.0", - "websockets>=15.0.0", - "httpx>=0.28.0", - "python-dotenv>=1.1.0", + "websockets>=15.0.1", + "httpx>=0.28.1", + "python-dotenv>=1.2.2", "quart-cors>=0.8.0", - "azure-core>=1.34.0", + "azure-core>=1.39.0", "azure-communication-identity>=1.5.0", - "aiohttp>=3.12.14", + "aiohttp>=3.13.5", "azure-eventgrid>=4.22.0", - "azure-identity>=1.23.0", - "azure-communication-callautomation>=1.4.0", - "openai[realtime]>=1.93.0", + "azure-identity>=1.25.3", + "azure-communication-callautomation>=1.5.0", + "openai[realtime]>=2.33.0", "h2>=4.3.0", - "numpy>=1.24.0" + "numpy>=1.24.0", + "twilio>=9.10.0", + "azure-ai-voicelive[aiohttp]>=1.1.0" ] -[tool.uv] -# Define the virtual environment location for uv -project_environment = ".venv" \ No newline at end of file +[tool.uv] \ No newline at end of file diff --git a/server/server.py b/server/server.py index f84b17d..d2d1d63 100644 --- a/server/server.py +++ b/server/server.py @@ -2,13 +2,17 @@ import logging import os -from app.handler.acs_event_handler import AcsEventHandler -from app.handler.acs_media_handler import ACSMediaHandler from dotenv import load_dotenv from quart import Quart, request, websocket +from app.handler.voicelive_media_handler import VoiceLiveMediaHandler + load_dotenv() +# --------------------------------------------------------------------------- +# App configuration +# --------------------------------------------------------------------------- + app = Quart(__name__) app.config["AZURE_VOICE_LIVE_API_KEY"] = os.getenv("AZURE_VOICE_LIVE_API_KEY", "") app.config["AZURE_VOICE_LIVE_ENDPOINT"] = os.getenv("AZURE_VOICE_LIVE_ENDPOINT") @@ -18,10 +22,8 @@ app.config["AZURE_USER_ASSIGNED_IDENTITY_CLIENT_ID"] = os.getenv( "AZURE_USER_ASSIGNED_IDENTITY_CLIENT_ID", "" ) - -# Ambient Scenes Configuration -# Options: none, office, call_center (or custom presets) app.config["AMBIENT_PRESET"] = os.getenv("AMBIENT_PRESET", "none") +app.config["TWILIO_AUTH_TOKEN"] = os.getenv("TWILIO_AUTH_TOKEN", "") logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s: %(message)s" @@ -35,42 +37,19 @@ else: logger.info("Ambient scenes DISABLED (preset=none)") -acs_handler = AcsEventHandler(app.config) - - -@app.route("/acs/incomingcall", methods=["POST"]) -async def incoming_call_handler(): - """Handles initial incoming call event from EventGrid.""" - events = await request.get_json() - host_url = request.host_url.replace("http://", "https://", 1).rstrip("/") - return await acs_handler.process_incoming_call(events, host_url, app.config) - - -@app.route("/acs/callbacks/", methods=["POST"]) -async def acs_event_callbacks(context_id): - """Handles ACS event callbacks for call connection and streaming events.""" - raw_events = await request.get_json() - return await acs_handler.process_callback_events(context_id, raw_events, app.config) +# --------------------------------------------------------------------------- +# Telephony detection (exclusive: Twilio OR ACS, never both) +# --------------------------------------------------------------------------- +if app.config["TWILIO_AUTH_TOKEN"]: + _telephony_client = "twilio" +else: + _telephony_client = "acs" +logger.info("Telephony client: %s", _telephony_client) -@app.websocket("/acs/ws") -async def acs_ws(): - """WebSocket endpoint for ACS to send audio to Voice Live.""" - logger = logging.getLogger("acs_ws") - logger.info("Incoming ACS WebSocket connection") - handler = ACSMediaHandler(app.config) - await handler.init_incoming_websocket(websocket, is_raw_audio=False) - asyncio.create_task(handler.connect()) - try: - while True: - msg = await websocket.receive() - await handler.acs_to_voicelive(msg) - except asyncio.CancelledError: - logger.info("ACS WebSocket cancelled") - except Exception: - logger.exception("ACS WebSocket connection closed") - finally: - await handler.stop_audio_output() +# --------------------------------------------------------------------------- +# Routes: Web client (always available) +# --------------------------------------------------------------------------- @app.websocket("/web/ws") @@ -78,19 +57,19 @@ async def web_ws(): """WebSocket endpoint for web clients to send audio to Voice Live.""" logger = logging.getLogger("web_ws") logger.info("Incoming Web WebSocket connection") - handler = ACSMediaHandler(app.config) - await handler.init_incoming_websocket(websocket, is_raw_audio=True) - asyncio.create_task(handler.connect()) + handler = VoiceLiveMediaHandler(app.config) + await handler.init_websocket(websocket) + asyncio.create_task(handler.connect_voicelive()) try: while True: msg = await websocket.receive() - await handler.web_to_voicelive(msg) + await handler.handle_audio(msg) except asyncio.CancelledError: logger.info("Web WebSocket cancelled") except Exception: logger.exception("Web WebSocket connection closed") finally: - await handler.stop_audio_output() + await handler._cleanup() @app.route("/") @@ -99,5 +78,96 @@ async def index(): return await app.send_static_file("index.html") +# --------------------------------------------------------------------------- +# Routes: Telephony (only one provider is registered) +# --------------------------------------------------------------------------- + +if _telephony_client == "twilio": + from app.handler.twilio_event_handler import TwilioEventHandler + from app.handler.twilio_media_handler import TwilioMediaHandler + + twilio_handler = TwilioEventHandler(app.config) + + @app.route("/voice", methods=["GET", "POST"]) + async def twilio_voice(): + """Handles incoming Twilio phone calls with bidirectional media stream.""" + logger.info("Twilio /voice webhook called") + + signature = request.headers.get("X-Twilio-Signature", "") + params = dict(await request.form) if request.method == "POST" else {} + valid = twilio_handler.validate_request(request.url, params, signature) + if valid is None: + return "Service Unavailable", 503 + if not valid: + return "Forbidden", 403 + + host_url = request.host_url.replace("http://", "https://", 1).rstrip("/") + ws_url = host_url.replace("https://", "wss://") + "/twilio/ws" + twiml = twilio_handler.generate_stream_twiml(ws_url) + return twiml, 200, {"Content-Type": "text/xml"} + + @app.websocket("/twilio/ws") + async def twilio_ws(): + """WebSocket endpoint for Twilio Media Streams to bridge to Voice Live.""" + logger = logging.getLogger("twilio_ws") + logger.info("Incoming Twilio Media Stream WebSocket connection") + + handler = TwilioMediaHandler(app.config) + handler.twilio_ws = websocket + + if not await handler.authenticate_and_start(): + return + + try: + await handler.connect_voicelive() + while True: + msg = await websocket.receive() + await handler.handle_twilio_message(msg) + except asyncio.CancelledError: + logger.info("Twilio WebSocket cancelled") + except Exception: + logger.exception("Twilio WebSocket connection closed") + finally: + await handler._cleanup() + +elif _telephony_client == "acs": + from app.handler.acs_event_handler import AcsEventHandler + from app.handler.acs_media_handler import ACSMediaHandler + + acs_handler = AcsEventHandler(app.config) + + @app.route("/acs/incomingcall", methods=["POST"]) + async def incoming_call_handler(): + """Handles initial incoming call event from EventGrid.""" + events = await request.get_json() + host_url = request.host_url.replace("http://", "https://", 1).rstrip("/") + return await acs_handler.process_incoming_call(events, host_url, app.config) + + @app.route("/acs/callbacks/", methods=["POST"]) + async def acs_event_callbacks(context_id): + """Handles ACS event callbacks for call connection and streaming events.""" + raw_events = await request.get_json() + return await acs_handler.process_callback_events(context_id, raw_events, app.config) + + @app.websocket("/acs/ws") + async def acs_ws(): + """WebSocket endpoint for ACS to send audio to Voice Live.""" + logger = logging.getLogger("acs_ws") + logger.info("Incoming ACS WebSocket connection") + handler = ACSMediaHandler(app.config) + await handler.init_websocket(websocket) + asyncio.create_task(handler.connect_voicelive()) + try: + while True: + msg = await websocket.receive() + await handler.handle_audio(msg) + except asyncio.CancelledError: + logger.info("ACS WebSocket cancelled") + except Exception: + logger.exception("ACS WebSocket connection closed") + finally: + await handler._cleanup() + + if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=8000) diff --git a/server/static/index.html b/server/static/index.html index bbdd2bd..df96f7f 100644 --- a/server/static/index.html +++ b/server/static/index.html @@ -214,6 +214,7 @@

🎙️ Voice Live Demo UX

socket.onclose = () => { console.log("WebSocket closed"); stopMicrophone(); + stopPlayback(); document.getElementById("startBtn").disabled = false; document.getElementById("stopBtn").disabled = true; };