|
| 1 | +"""Test CDN upload/download + media send/receive pipeline (mocked HTTP).""" |
| 2 | + |
| 3 | +import base64 |
| 4 | +import hashlib |
| 5 | +import pytest |
| 6 | +import httpx |
| 7 | + |
| 8 | +from wechat_agent_sdk.media.crypto import encrypt, generate_aes_key, decrypt, decode_aes_key |
| 9 | +from wechat_agent_sdk.media.cdn import download_media, upload_media, CDN_BASE |
| 10 | +from wechat_agent_sdk.transport import _build_media_item |
| 11 | + |
| 12 | + |
| 13 | +# ── CDN download tests ── |
| 14 | + |
| 15 | + |
| 16 | +class TestCdnDownload: |
| 17 | + @pytest.mark.asyncio |
| 18 | + async def test_download_and_decrypt(self): |
| 19 | + """download_media should fetch from CDN and AES-decrypt the content.""" |
| 20 | + original = b"hello this is an image file content" |
| 21 | + key = generate_aes_key() |
| 22 | + encrypted = encrypt(original, key) |
| 23 | + aes_key_b64 = base64.b64encode(key).decode() |
| 24 | + |
| 25 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 26 | + assert "encrypted_query_param=cdn_param_abc" in str(request.url) |
| 27 | + return httpx.Response(200, content=encrypted) |
| 28 | + |
| 29 | + transport = httpx.MockTransport(mock_handler) |
| 30 | + async with httpx.AsyncClient(transport=transport) as client: |
| 31 | + result = await download_media( |
| 32 | + client, "cdn_param_abc", aes_key_b64 |
| 33 | + ) |
| 34 | + |
| 35 | + assert result == original |
| 36 | + |
| 37 | + @pytest.mark.asyncio |
| 38 | + async def test_download_with_hex_key(self): |
| 39 | + """download_media should handle image_item.aeskey (hex format).""" |
| 40 | + original = b"png image data here" |
| 41 | + key = generate_aes_key() |
| 42 | + encrypted = encrypt(original, key) |
| 43 | + hex_key = key.hex() |
| 44 | + |
| 45 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 46 | + return httpx.Response(200, content=encrypted) |
| 47 | + |
| 48 | + transport = httpx.MockTransport(mock_handler) |
| 49 | + async with httpx.AsyncClient(transport=transport) as client: |
| 50 | + result = await download_media( |
| 51 | + client, "cdn_param_xyz", "", aeskey_hex=hex_key |
| 52 | + ) |
| 53 | + |
| 54 | + assert result == original |
| 55 | + |
| 56 | + @pytest.mark.asyncio |
| 57 | + async def test_download_http_error_raises(self): |
| 58 | + """download_media should raise on HTTP error.""" |
| 59 | + |
| 60 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 61 | + return httpx.Response(500, text="CDN error") |
| 62 | + |
| 63 | + transport = httpx.MockTransport(mock_handler) |
| 64 | + async with httpx.AsyncClient(transport=transport) as client: |
| 65 | + with pytest.raises(httpx.HTTPStatusError): |
| 66 | + await download_media(client, "bad_param", base64.b64encode(b"\x00" * 16).decode()) |
| 67 | + |
| 68 | + |
| 69 | +# ── CDN upload tests ── |
| 70 | + |
| 71 | + |
| 72 | +class MockBotClient: |
| 73 | + """Mock ILinkBotClient for upload tests.""" |
| 74 | + |
| 75 | + def __init__(self): |
| 76 | + self.upload_calls = [] |
| 77 | + |
| 78 | + async def get_upload_url(self, **kwargs): |
| 79 | + self.upload_calls.append(kwargs) |
| 80 | + return {"upload_param": "presigned_upload_param_123"} |
| 81 | + |
| 82 | + |
| 83 | +class TestCdnUpload: |
| 84 | + @pytest.mark.asyncio |
| 85 | + async def test_upload_full_pipeline(self): |
| 86 | + """upload_media: encrypt → getuploadurl → CDN POST → return cdn_info.""" |
| 87 | + file_data = b"test file content for upload" |
| 88 | + cdn_response_param = "cdn_encrypted_result_abc" |
| 89 | + |
| 90 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 91 | + # CDN upload endpoint |
| 92 | + if "/upload" in str(request.url): |
| 93 | + assert "encrypted_query_param=presigned_upload_param_123" in str(request.url) |
| 94 | + assert request.headers.get("content-type") == "application/octet-stream" |
| 95 | + |
| 96 | + # Verify body is encrypted (not raw) |
| 97 | + body = request.content |
| 98 | + assert body != file_data |
| 99 | + assert len(body) > 0 |
| 100 | + |
| 101 | + return httpx.Response( |
| 102 | + 200, |
| 103 | + headers={"x-encrypted-param": cdn_response_param}, |
| 104 | + ) |
| 105 | + return httpx.Response(404) |
| 106 | + |
| 107 | + bot_client = MockBotClient() |
| 108 | + transport = httpx.MockTransport(mock_handler) |
| 109 | + async with httpx.AsyncClient(transport=transport) as http_client: |
| 110 | + cdn_info = await upload_media( |
| 111 | + bot_client=bot_client, |
| 112 | + http_client=http_client, |
| 113 | + to_user_id="user_123", |
| 114 | + file_data=file_data, |
| 115 | + media_type=1, # IMAGE |
| 116 | + file_name="photo.jpg", |
| 117 | + ) |
| 118 | + |
| 119 | + # Verify cdn_info structure |
| 120 | + assert cdn_info["encrypt_query_param"] == cdn_response_param |
| 121 | + assert cdn_info["encrypt_type"] == 1 |
| 122 | + assert cdn_info["aes_key"] # should be a base64 string |
| 123 | + |
| 124 | + # Verify the AES key in cdn_info can decrypt what was uploaded |
| 125 | + key = decode_aes_key(cdn_info["aes_key"]) |
| 126 | + assert len(key) == 16 |
| 127 | + |
| 128 | + # Verify bot_client.get_upload_url was called correctly |
| 129 | + assert len(bot_client.upload_calls) == 1 |
| 130 | + call = bot_client.upload_calls[0] |
| 131 | + assert call["media_type"] == 1 |
| 132 | + assert call["to_user_id"] == "user_123" |
| 133 | + assert call["raw_size"] == len(file_data) |
| 134 | + assert call["raw_file_md5"] == hashlib.md5(file_data).hexdigest() |
| 135 | + |
| 136 | + @pytest.mark.asyncio |
| 137 | + async def test_upload_no_upload_param_raises(self): |
| 138 | + """upload_media should raise if getuploadurl returns no upload_param.""" |
| 139 | + |
| 140 | + class BadBotClient: |
| 141 | + async def get_upload_url(self, **kwargs): |
| 142 | + return {} # missing upload_param |
| 143 | + |
| 144 | + transport = httpx.MockTransport(lambda r: httpx.Response(200)) |
| 145 | + async with httpx.AsyncClient(transport=transport) as http_client: |
| 146 | + with pytest.raises(RuntimeError, match="upload_param"): |
| 147 | + await upload_media( |
| 148 | + bot_client=BadBotClient(), |
| 149 | + http_client=http_client, |
| 150 | + to_user_id="user_1", |
| 151 | + file_data=b"data", |
| 152 | + media_type=1, |
| 153 | + ) |
| 154 | + |
| 155 | + @pytest.mark.asyncio |
| 156 | + async def test_upload_no_cdn_header_raises(self): |
| 157 | + """upload_media should raise if CDN response lacks x-encrypted-param.""" |
| 158 | + |
| 159 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 160 | + return httpx.Response(200) # no x-encrypted-param header |
| 161 | + |
| 162 | + bot_client = MockBotClient() |
| 163 | + transport = httpx.MockTransport(mock_handler) |
| 164 | + async with httpx.AsyncClient(transport=transport) as http_client: |
| 165 | + with pytest.raises(RuntimeError, match="x-encrypted-param"): |
| 166 | + await upload_media( |
| 167 | + bot_client=bot_client, |
| 168 | + http_client=http_client, |
| 169 | + to_user_id="user_1", |
| 170 | + file_data=b"data", |
| 171 | + media_type=3, # FILE |
| 172 | + ) |
| 173 | + |
| 174 | + @pytest.mark.asyncio |
| 175 | + async def test_upload_roundtrip_decrypt(self): |
| 176 | + """Uploaded encrypted data should be decryptable with the returned key.""" |
| 177 | + file_data = b"PDF file content here " * 100 # ~2.2KB |
| 178 | + captured_body = {} |
| 179 | + |
| 180 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 181 | + captured_body["data"] = request.content |
| 182 | + return httpx.Response( |
| 183 | + 200, headers={"x-encrypted-param": "cdn_result"} |
| 184 | + ) |
| 185 | + |
| 186 | + bot_client = MockBotClient() |
| 187 | + transport = httpx.MockTransport(mock_handler) |
| 188 | + async with httpx.AsyncClient(transport=transport) as http_client: |
| 189 | + cdn_info = await upload_media( |
| 190 | + bot_client=bot_client, |
| 191 | + http_client=http_client, |
| 192 | + to_user_id="user_1", |
| 193 | + file_data=file_data, |
| 194 | + media_type=3, |
| 195 | + ) |
| 196 | + |
| 197 | + # Decrypt what was uploaded using the returned key |
| 198 | + key = decode_aes_key(cdn_info["aes_key"]) |
| 199 | + decrypted = decrypt(captured_body["data"], key) |
| 200 | + assert decrypted == file_data |
| 201 | + |
| 202 | + |
| 203 | +# ── _build_media_item tests ── |
| 204 | + |
| 205 | + |
| 206 | +class TestBuildMediaItem: |
| 207 | + def test_image_item(self): |
| 208 | + cdn_info = {"encrypt_query_param": "p1", "aes_key": "k1", "encrypt_type": 1} |
| 209 | + item = _build_media_item("image", cdn_info) |
| 210 | + assert item["type"] == 2 |
| 211 | + assert item["image_item"]["media"]["encrypt_query_param"] == "p1" |
| 212 | + |
| 213 | + def test_video_item(self): |
| 214 | + cdn_info = {"encrypt_query_param": "p2", "aes_key": "k2", "encrypt_type": 1} |
| 215 | + item = _build_media_item("video", cdn_info) |
| 216 | + assert item["type"] == 5 |
| 217 | + assert "video_item" in item |
| 218 | + |
| 219 | + def test_voice_item(self): |
| 220 | + cdn_info = {"encrypt_query_param": "p3", "aes_key": "k3", "encrypt_type": 1} |
| 221 | + item = _build_media_item("voice", cdn_info) |
| 222 | + assert item["type"] == 3 |
| 223 | + assert "voice_item" in item |
| 224 | + |
| 225 | + def test_file_item(self): |
| 226 | + cdn_info = {"encrypt_query_param": "p4", "aes_key": "k4", "encrypt_type": 1} |
| 227 | + item = _build_media_item("file", cdn_info, file_name="report.pdf") |
| 228 | + assert item["type"] == 4 |
| 229 | + assert item["file_item"]["media"]["encrypt_query_param"] == "p4" |
| 230 | + assert item["file_item"]["file_name"] == "report.pdf" |
| 231 | + |
| 232 | + def test_file_item_no_name(self): |
| 233 | + cdn_info = {"encrypt_query_param": "p5", "aes_key": "k5", "encrypt_type": 1} |
| 234 | + item = _build_media_item("file", cdn_info) |
| 235 | + assert item["type"] == 4 |
| 236 | + assert "file_name" not in item["file_item"] |
| 237 | + |
| 238 | + def test_unknown_type_defaults_to_file(self): |
| 239 | + cdn_info = {"encrypt_query_param": "p6", "aes_key": "k6", "encrypt_type": 1} |
| 240 | + item = _build_media_item("unknown", cdn_info) |
| 241 | + assert item["type"] == 4 # file type |
| 242 | + assert "file_item" in item |
| 243 | + |
| 244 | + |
| 245 | +# ── Transport send_media / download_media integration tests ── |
| 246 | + |
| 247 | + |
| 248 | +class TestTransportMedia: |
| 249 | + @pytest.mark.asyncio |
| 250 | + async def test_transport_download_media(self): |
| 251 | + """transport.download_media() should download and decrypt.""" |
| 252 | + from wechat_agent_sdk import WeChatTransport |
| 253 | + from wechat_agent_sdk.types import MediaInfo |
| 254 | + |
| 255 | + original = b"image bytes here" |
| 256 | + key = generate_aes_key() |
| 257 | + encrypted = encrypt(original, key) |
| 258 | + aes_key_b64 = base64.b64encode(key).decode() |
| 259 | + |
| 260 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 261 | + if "encrypted_query_param" in str(request.url): |
| 262 | + return httpx.Response(200, content=encrypted) |
| 263 | + return httpx.Response(200, json={}) # for any other calls |
| 264 | + |
| 265 | + transport = WeChatTransport(account_id="test", token="tok") |
| 266 | + transport._client._client = httpx.AsyncClient( |
| 267 | + transport=httpx.MockTransport(mock_handler) |
| 268 | + ) |
| 269 | + |
| 270 | + media = MediaInfo( |
| 271 | + type="image", |
| 272 | + cdn_param="test_cdn_param", |
| 273 | + aes_key=aes_key_b64, |
| 274 | + ) |
| 275 | + result = await transport.download_media(media) |
| 276 | + assert result == original |
| 277 | + |
| 278 | + @pytest.mark.asyncio |
| 279 | + async def test_transport_send_media(self): |
| 280 | + """transport.send_media() should encrypt, upload, and send.""" |
| 281 | + from wechat_agent_sdk import WeChatTransport |
| 282 | + |
| 283 | + file_data = b"video file content" |
| 284 | + sent_messages = [] |
| 285 | + |
| 286 | + async def mock_handler(request: httpx.Request) -> httpx.Response: |
| 287 | + url = str(request.url) |
| 288 | + |
| 289 | + # getuploadurl |
| 290 | + if "getuploadurl" in url: |
| 291 | + return httpx.Response(200, json={"upload_param": "up_param"}) |
| 292 | + # CDN upload |
| 293 | + if "/upload" in url: |
| 294 | + return httpx.Response( |
| 295 | + 200, headers={"x-encrypted-param": "cdn_result_param"} |
| 296 | + ) |
| 297 | + # sendmessage |
| 298 | + if "sendmessage" in url: |
| 299 | + body = request.content |
| 300 | + sent_messages.append(body) |
| 301 | + return httpx.Response(200, json={"ret": 0}) |
| 302 | + return httpx.Response(200, json={}) |
| 303 | + |
| 304 | + transport = WeChatTransport(account_id="test", token="tok") |
| 305 | + transport._client._client = httpx.AsyncClient( |
| 306 | + transport=httpx.MockTransport(mock_handler) |
| 307 | + ) |
| 308 | + |
| 309 | + await transport.send_media( |
| 310 | + chat_id="user_456", |
| 311 | + file_data=file_data, |
| 312 | + media_type="video", |
| 313 | + file_name="clip.mp4", |
| 314 | + context_token="ctx_123", |
| 315 | + ) |
| 316 | + |
| 317 | + # Verify sendmessage was called |
| 318 | + assert len(sent_messages) == 1 |
| 319 | + |
| 320 | + import json |
| 321 | + msg = json.loads(sent_messages[0]) |
| 322 | + assert msg["msg"]["to_user_id"] == "user_456" |
| 323 | + assert msg["msg"]["context_token"] == "ctx_123" |
| 324 | + item = msg["msg"]["item_list"][0] |
| 325 | + assert item["type"] == 5 # video |
| 326 | + assert item["video_item"]["media"]["encrypt_query_param"] == "cdn_result_param" |
0 commit comments