Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions kairon/chat/handlers/channels/whatsapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from kairon.shared.chat.processor import ChatDataProcessor
from kairon import Utility
from kairon.shared.chat.user_media import UserMedia
from kairon.shared.concurrency.actors.factory import ActorFactory
from kairon.shared.constants import ChannelTypes, ActorType
from kairon.shared.models import User
Expand All @@ -38,6 +39,7 @@ async def message(

# quick reply and user message both share 'text' attribute
# so quick reply should be checked first
media_ids = None
if message.get("type") == "interactive":
interactive_type = message.get("interactive").get("type")
if interactive_type == "nfm_reply":
Expand All @@ -59,6 +61,12 @@ async def message(
if message['type'] == "voice":
message['type'] = "audio"
text = f"/k_multimedia_msg{{\"{message['type']}\": \"{message[message['type']]['id']}\"}}"
media_ids = UserMedia.save_whatsapp_media_content(
bot=bot,
sender_id=message["from"],
whatsapp_media_id=message[message['type']]['id'],
config=self.config
)
elif message.get("type") == "location":
logger.debug(message['location'])
text = f"/k_multimedia_msg{{\"latitude\": \"{message['location']['latitude']}\", \"longitude\": \"{message['location']['longitude']}\"}}"
Expand All @@ -74,7 +82,7 @@ async def message(
logger.warning(f"Received a message from whatsapp that we can not handle. Message: {message}")
return
message.update(metadata)
await self._handle_user_message(text, message["from"], message, bot)
await self._handle_user_message(text, message["from"], message, bot, media_ids)

async def handle_meta_payload(self, payload: Dict, metadata: Optional[Dict[Text, Any]], bot: str) -> None:
provider = self.config.get("bsp_type", "meta")
Expand Down Expand Up @@ -154,7 +162,7 @@ def get_business_phone_number_id(self) -> Text:
return self.last_message.get("value", {}).get("metadata", {}).get("phone_number_id", "")

async def _handle_user_message(
self, text: Text, sender_id: Text, metadata: Optional[Dict[Text, Any]], bot: str
self, text: Text, sender_id: Text, metadata: Optional[Dict[Text, Any]], bot: str, media_ids: list[str] = None
) -> None:
"""Pass on the text to the dialogue engine for processing."""
out_channel = WhatsappBot(self.client)
Expand All @@ -164,14 +172,14 @@ async def _handle_user_message(
text, out_channel, sender_id, input_channel=self.name(), metadata=metadata
)
try:
await self.process_message(bot, user_msg)
await self.process_message(bot, user_msg, media_ids)
except Exception as e:
logger.exception("Exception when trying to handle webhook for whatsapp message.")
logger.exception(e)

@staticmethod
async def process_message(bot: str, user_message: UserMessage):
await AgentProcessor.handle_channel_message(bot, user_message)
async def process_message(bot: str, user_message: UserMessage, media_ids: list[str] = None):
await AgentProcessor.handle_channel_message(bot, user_message, media_ids=media_ids)

def __get_access_token(self):
provider = self.config.get("bsp_type", "meta")
Expand Down
102 changes: 102 additions & 0 deletions kairon/shared/chat/user_media.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import asyncio
import base64
import mimetypes
import os
from datetime import datetime
from pathlib import Path
from typing import BinaryIO
from markdown_pdf import MarkdownPdf, Section
from loguru import logger
from fastapi import File
import requests
from mongoengine import DoesNotExist
from pathy import ClientError
from uuid6 import uuid7
Expand All @@ -23,6 +25,10 @@ class UserMedia:

@staticmethod
def create_user_media_data(bot: str, media_id: str, filename: str, sender_id: str, upload_type: str = UserMediaUploadType.user_uploaded.value):
"""
Create user media data in processing state.
Call mark_user_media_data_upload_done() to mark the upload as done or mark_user_media_data_upload_failed() to mark the upload as failed.
"""
extension = str(Path(filename).suffix).lower()
user_media_data = UserMediaData(
media_id=media_id,
Expand All @@ -38,6 +44,9 @@ def create_user_media_data(bot: str, media_id: str, filename: str, sender_id: st

@staticmethod
def mark_user_media_data_upload_done(media_id:str, media_url: str, output_filename: str, filesize: int):
"""
Mark user media data upload as done.
"""
user_media_data = UserMediaData.objects(media_id=media_id).first()
if user_media_data:
user_media_data.media_url = media_url
Expand All @@ -64,6 +73,14 @@ def save_media_content(
binary_data: bytes = None,
filename: str = None,
):
"""
Save media content to cloud storage.
:param bot: bot name
:param sender_id: sender id
:param media_id: media id
:param binary_data: binary data of the file
:param filename: name of the file
"""

if not filename:
raise AppException('filename must be provided for binary data')
Expand Down Expand Up @@ -99,6 +116,84 @@ def save_media_content(
UserMedia.mark_user_media_data_upload_failed(media_id=media_id, reason=str(e))
raise AppException(f"File upload for {media_id} failed")

@staticmethod
def save_whatsapp_media_content(bot: str, sender_id: str, whatsapp_media_id:str, config: dict):
"""
Download media from 360 dialog or meta and save it to cloud storage via background task.
:param bot: bot name
:param sender_id: sender id
:param whatsapp_media_id: whatsapp media id
:param config: configuration for 360 dialog or meta
:return: list of media ids
"""
download_url = None
file_path = None
headers = {}
provider = config.get("bsp_type", "meta")
if provider == '360dialog':
endpoint = f'https://waba-v2.360dialog.io/{whatsapp_media_id}'
headers = {
'D360-API-KEY': config.get('api_key'),
}
resp = requests.get(endpoint, headers=headers, stream=True)
if resp.status_code != 200:
raise AppException(f"Failed to download media from 360 dialog: {resp.status_code} - {resp.text}")
json_resp = resp.json()
download_url = json_resp.get("url")
download_url = download_url.replace('https://lookaside.fbsbx.com', 'https://waba-v2.360dialog.io')
mime_type = json_resp.get("mime_type")
extension = mimetypes.guess_extension(mime_type) or ''
file_path = f"whatsapp_360_{whatsapp_media_id}{extension}"
elif provider == 'meta':
endpoint = f'https://graph.facebook.com/v22.0/{whatsapp_media_id}'
access_token = config.get('access_token')
headers = {'Authorization': f'Bearer {access_token}'}
media_info_resp = requests.get(
endpoint,
params={"fields": "url", "access_token": access_token},
timeout=10
)
if media_info_resp.status_code != 200:
raise AppException(f"Failed to get url from meta for media: {whatsapp_media_id}")
json_resp = media_info_resp.json()
download_url = json_resp.get("url")
mime_type = json_resp.get("mime_type")
extension = mimetypes.guess_extension(mime_type) or ''
file_path = f"whatsapp_meta_{whatsapp_media_id}{extension}"

media_resp = requests.get(
download_url,
headers=headers,
stream=True,
timeout=10
)
if media_resp.status_code != 200:
raise AppException(f"Failed to download media: {whatsapp_media_id}")
buffer = bytearray()
for chunk in media_resp.iter_content(chunk_size=8192):
if chunk:
buffer.extend(chunk)
file_buffer = bytes(buffer)

media_id = uuid7().hex
UserMedia.create_user_media_data(
bot=bot,
media_id=media_id,
filename=file_path,
sender_id=sender_id,
upload_type=UserMediaUploadType.user_uploaded.value)

asyncio.create_task(UserMedia.save_media_content_task(
bot=bot,
sender_id=sender_id,
media_id=media_id,
binary_data=file_buffer,
filename=file_path
))

return [media_id]


@staticmethod
async def save_media_content_task(
bot: str,
Expand All @@ -122,6 +217,13 @@ async def upload_media_contents(
sender_id: str,
files: list[File],
):
"""
Upload media contents to cloud storage via background task.
:param bot: bot name
:param sender_id: sender id
:param files: list of files to upload
:return: list of media ids
"""
media_ids = []
read_tasks = [asyncio.create_task(file.read()) for file in files]
binary_datas = await asyncio.gather(*read_tasks)
Expand Down
6 changes: 4 additions & 2 deletions tests/unit_test/channels/whatsapp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,9 @@ async def test_whatsapp_invalid_message_request(self):
"tabname": "default"},
bot)

@patch('kairon.shared.chat.user_media.UserMedia.save_whatsapp_media_content', return_value=['cbeuoaincpiac'])
@pytest.mark.asyncio
async def test_valid_order_message_request(self):
async def test_valid_order_message_request(self, wp):
from kairon.chat.handlers.channels.whatsapp import Whatsapp, WhatsappBot
with patch.object(WhatsappBot, "mark_as_read"):
with patch.object(Whatsapp, "process_message") as mock_message:
Expand Down Expand Up @@ -520,8 +521,9 @@ async def test_valid_order_message_request(self):

assert user_message.text == '/k_order_msg{"order": {"catalog_id": "538971028364699", "product_items": [{"product_retailer_id": "akuba13e44", "quantity": 1, "item_price": 200, "currency": "INR"}, {"product_retailer_id": "0z10aj0bmq", "quantity": 1, "item_price": 600, "currency": "INR"}]}}'

@patch('kairon.shared.chat.user_media.UserMedia.save_whatsapp_media_content', return_value=['cbeuoaincpiac'])
@pytest.mark.asyncio
async def test_valid_attachment_message_request(self):
async def test_valid_attachment_message_request(self, wp):
from kairon.chat.handlers.channels.whatsapp import Whatsapp, WhatsappBot
with patch.object(WhatsappBot, "mark_as_read"):
with patch.object(Whatsapp, "process_message") as mock_message:
Expand Down
109 changes: 108 additions & 1 deletion tests/unit_test/chat/user_media_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kairon.exceptions import AppException
from kairon.shared.chat.user_media import UserMedia
from kairon.shared.data.data_objects import UserMediaData
from kairon.shared.models import UserMediaUploadStatus
from kairon.shared.models import UserMediaUploadStatus, UserMediaUploadType
from mongoengine import connect


Expand Down Expand Up @@ -322,3 +322,110 @@ def test_mark_user_media_data_upload_done_not_found(mock_objects):
def test_mark_user_media_data_upload_failed_not_found(mock_objects):
mock_objects.return_value.get.return_value = None
UserMedia.mark_user_media_data_upload_failed("notfound", "fail reason")


@pytest.mark.asyncio
@patch("kairon.shared.chat.user_media.UserMedia.create_user_media_data")
@patch("kairon.shared.chat.user_media.uuid7")
@patch("kairon.shared.chat.user_media.requests.get")
async def test_save_whatsapp_media_content_360dialog_success(mock_get, mock_uuid, mock_create):
bot = "bot1"
sender_id = "user1"
whatsapp_media_id = "media123"
config = {"bsp_type": "360dialog", "api_key": "key123"}

resp_info = MagicMock()
resp_info.status_code = 200
resp_info.json.return_value = {
"url": "https://lookaside.fbsbx.com/path/file.jpg",
"mime_type": "image/jpeg"
}

resp_media = MagicMock()
resp_media.status_code = 200
resp_media.iter_content = MagicMock(return_value=[b"chunk1", b"chunk2"])

mock_get.side_effect = [resp_info, resp_media]

mock_uuid.return_value.hex = "uuid123"

created = []
with patch("asyncio.create_task", lambda coro: created.append(coro)):
result = UserMedia.save_whatsapp_media_content(bot, sender_id, whatsapp_media_id, config)

assert result == ["uuid123"]
mock_get.assert_called()
mock_create.assert_called_once_with(
bot=bot,
media_id="uuid123",
filename="whatsapp_360_media123.jpg",
sender_id=sender_id,
upload_type=UserMediaUploadType.user_uploaded.value
)
assert len(created) == 1


@pytest.mark.asyncio
@patch("kairon.shared.chat.user_media.UserMedia.create_user_media_data")
@patch("kairon.shared.chat.user_media.uuid7")
@patch("kairon.shared.chat.user_media.requests.get")
async def test_save_whatsapp_media_content_meta_success(mock_get, mock_uuid, mock_create):
bot = "bot2"
sender_id = "user2"
whatsapp_media_id = "media456"
config = {"bsp_type": "meta", "access_token": "token456"}

media_info = MagicMock()
media_info.status_code = 200
media_info.json.return_value = {
"url": "https://graph.facebook.com/path/file.mp4",
"mime_type": "video/mp4"
}
media_resp = MagicMock()
media_resp.status_code = 200
media_resp.iter_content = MagicMock(return_value=[b"data1", b"data2"])

mock_get.side_effect = [media_info, media_resp]

mock_uuid.return_value.hex = "uuid456"

called = []
with patch("asyncio.create_task", lambda coro: called.append(coro)):
result = UserMedia.save_whatsapp_media_content(bot, sender_id, whatsapp_media_id, config)

assert result == ["uuid456"]
mock_get.assert_any_call(
f"https://graph.facebook.com/v22.0/{whatsapp_media_id}",
params={"fields": "url", "access_token": config['access_token']},
timeout=10
)
mock_create.assert_called_once()


def created_coros(coros):
return coros


@pytest.mark.asyncio
@patch("kairon.shared.chat.user_media.requests.get")
def test_save_whatsapp_media_content_360dialog_failure(mock_get):
config = {"bsp_type": "360dialog", "api_key": "key"}
resp = MagicMock(status_code=500, text="error")
mock_get.return_value = resp

with pytest.raises(AppException) as exc:
UserMedia.save_whatsapp_media_content("b","s","id", config)
assert "Failed to download media from 360 dialog" in str(exc.value)


@pytest.mark.asyncio
@patch("kairon.shared.chat.user_media.requests.get")
def test_save_whatsapp_media_content_meta_failure(mock_get):
config = {"bsp_type": "meta", "access_token": "token"}
resp = MagicMock(status_code=400)
mock_get.return_value = resp

with pytest.raises(AppException) as exc:
UserMedia.save_whatsapp_media_content("b","s","id", config)
assert "Failed to get url from meta" in str(exc.value)