|
23 | 23 | from http import HTTPStatus |
24 | 24 | from typing import TYPE_CHECKING, Any |
25 | 25 |
|
26 | | -from canonicaljson import encode_canonical_json |
27 | | - |
28 | | -from synapse.api.constants import ( |
29 | | - MAX_EDU_SIZE, |
30 | | - EduTypes, |
31 | | - EventContentFields, |
32 | | - ToDeviceEventTypes, |
33 | | -) |
34 | | -from synapse.api.errors import Codes, EventSizeError, SynapseError |
| 26 | +from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes |
| 27 | +from synapse.api.errors import Codes, SynapseError |
35 | 28 | from synapse.api.ratelimiting import Ratelimiter |
36 | 29 | from synapse.logging.context import run_in_background |
37 | 30 | from synapse.logging.opentracing import ( |
|
42 | 35 | ) |
43 | 36 | from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id |
44 | 37 | from synapse.util.json import json_encoder |
45 | | -from synapse.util.stringutils import random_string_insecure_fast |
| 38 | +from synapse.util.stringutils import random_string |
46 | 39 |
|
47 | 40 | if TYPE_CHECKING: |
48 | 41 | from synapse.server import HomeServer |
@@ -229,7 +222,6 @@ async def send_device_message( |
229 | 222 | set_tag(SynapseTags.TO_DEVICE_TYPE, message_type) |
230 | 223 | set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id) |
231 | 224 | local_messages = {} |
232 | | - # Map from destination (server) -> recipient (user ID) -> device_id -> JSON message content |
233 | 225 | remote_messages: dict[str, dict[str, dict[str, JsonDict]]] = {} |
234 | 226 | for user_id, by_device in messages.items(): |
235 | 227 | if not UserID.is_valid(user_id): |
@@ -285,33 +277,28 @@ async def send_device_message( |
285 | 277 | destination = get_domain_from_id(user_id) |
286 | 278 | remote_messages.setdefault(destination, {})[user_id] = by_device |
287 | 279 |
|
288 | | - # Add local messages to the database. |
| 280 | + context = get_active_span_text_map() |
| 281 | + |
| 282 | + remote_edu_contents = {} |
| 283 | + for destination, messages in remote_messages.items(): |
| 284 | + # The EDU contains a "message_id" property which is used for |
| 285 | + # idempotence. Make up a random one. |
| 286 | + message_id = random_string(16) |
| 287 | + log_kv({"destination": destination, "message_id": message_id}) |
| 288 | + |
| 289 | + remote_edu_contents[destination] = { |
| 290 | + "messages": messages, |
| 291 | + "sender": sender_user_id, |
| 292 | + "type": message_type, |
| 293 | + "message_id": message_id, |
| 294 | + "org.matrix.opentracing_context": json_encoder.encode(context), |
| 295 | + } |
| 296 | + |
| 297 | + # Add messages to the database. |
289 | 298 | # Retrieve the stream id of the last-processed to-device message. |
290 | | - last_stream_id = ( |
291 | | - await self.store.add_local_messages_from_client_to_device_inbox( |
292 | | - local_messages |
293 | | - ) |
| 299 | + last_stream_id = await self.store.add_messages_to_device_inbox( |
| 300 | + local_messages, remote_edu_contents |
294 | 301 | ) |
295 | | - for destination, messages in remote_messages.items(): |
296 | | - split_edus = split_device_messages_into_edus( |
297 | | - sender_user_id, message_type, messages |
298 | | - ) |
299 | | - for edu in split_edus: |
300 | | - edu["org.matrix.opentracing_context"] = json_encoder.encode( |
301 | | - get_active_span_text_map() |
302 | | - ) |
303 | | - # Add remote messages to the database. |
304 | | - last_stream_id = ( |
305 | | - await self.store.add_remote_messages_from_client_to_device_inbox( |
306 | | - {destination: edu} |
307 | | - ) |
308 | | - ) |
309 | | - log_kv( |
310 | | - { |
311 | | - "destination": destination, |
312 | | - "message_id": edu["message_id"], |
313 | | - } |
314 | | - ) |
315 | 302 |
|
316 | 303 | # Notify listeners that there are new to-device messages to process, |
317 | 304 | # handing them the latest stream id. |
@@ -410,102 +397,3 @@ async def get_events_for_dehydrated_device( |
410 | 397 | "events": messages, |
411 | 398 | "next_batch": f"d{stream_id}", |
412 | 399 | } |
413 | | - |
414 | | - |
415 | | -def split_device_messages_into_edus( |
416 | | - sender_user_id: str, |
417 | | - message_type: str, |
418 | | - messages_by_user_then_device: dict[str, dict[str, JsonDict]], |
419 | | -) -> list[JsonDict]: |
420 | | - """ |
421 | | - This function takes many to-device messages and fits/splits them into several EDUs |
422 | | - as necessary. We split the messages up as the overall request can overrun the |
423 | | - `max_request_body_size` and prevent outbound federation traffic because of the size |
424 | | - of the transaction (cf. `MAX_EDU_SIZE`). |
425 | | -
|
426 | | - Args: |
427 | | - sender_user_id: The user that is sending the to-device messages. |
428 | | - message_type: The type of to-device messages that are being sent. |
429 | | - messages_by_user_then_device: Dictionary of recipient user_id to recipient device_id to message. |
430 | | -
|
431 | | - Returns: |
432 | | - Bin-packed list of EDU JSON content for the given to_device messages |
433 | | -
|
434 | | - Raises: |
435 | | - EventSizeError: If a single to-device message is too large to fit into an EDU. |
436 | | - """ |
437 | | - split_edus_content: list[JsonDict] = [] |
438 | | - |
439 | | - # Convert messages dict to a list of (recipient, messages_by_device) pairs |
440 | | - message_items = list(messages_by_user_then_device.items()) |
441 | | - |
442 | | - while message_items: |
443 | | - edu_messages = {} |
444 | | - # Start by trying to fit all remaining messages |
445 | | - target_count = len(message_items) |
446 | | - |
447 | | - while target_count > 0: |
448 | | - # Take the first target_count messages |
449 | | - edu_messages = dict(message_items[:target_count]) |
450 | | - edu_content = create_new_to_device_edu_content( |
451 | | - sender_user_id, message_type, edu_messages |
452 | | - ) |
453 | | - # Let's add the whole EDU structure before testing the size |
454 | | - edu = { |
455 | | - "content": edu_content, |
456 | | - "edu_type": EduTypes.DIRECT_TO_DEVICE, |
457 | | - } |
458 | | - |
459 | | - if len(encode_canonical_json(edu)) <= MAX_EDU_SIZE: |
460 | | - # It fits! Add this EDU and remove these messages from the list |
461 | | - split_edus_content.append(edu_content) |
462 | | - message_items = message_items[target_count:] |
463 | | - |
464 | | - logger.debug( |
465 | | - "Created EDU with %d recipients from %s (message_id=%s), (total EDUs so far: %d)", |
466 | | - target_count, |
467 | | - sender_user_id, |
468 | | - edu_content["message_id"], |
469 | | - len(split_edus_content), |
470 | | - ) |
471 | | - break |
472 | | - else: |
473 | | - if target_count == 1: |
474 | | - # Single recipient's messages are too large, let's reject the client |
475 | | - # call with 413/`M_TOO_LARGE`, we expect this error to reach the |
476 | | - # client in the case of the /sendToDevice endpoint. |
477 | | - # |
478 | | - # 413 is currently an unspecced response for `/sendToDevice` but is |
479 | | - # probably the best thing we can do. |
480 | | - # https://github.com/matrix-org/matrix-spec/pull/2340 tracks adding |
481 | | - # this to the spec |
482 | | - recipient = message_items[0][0] |
483 | | - raise EventSizeError( |
484 | | - f"device message to {recipient} too large to fit in a single EDU", |
485 | | - unpersistable=True, |
486 | | - ) |
487 | | - |
488 | | - # Halve the number of messages and try again |
489 | | - target_count = target_count // 2 |
490 | | - |
491 | | - return split_edus_content |
492 | | - |
493 | | - |
494 | | -def create_new_to_device_edu_content( |
495 | | - sender_user_id: str, |
496 | | - message_type: str, |
497 | | - messages_by_user_then_device: dict[str, dict[str, JsonDict]], |
498 | | -) -> JsonDict: |
499 | | - """ |
500 | | - Create a new `m.direct_to_device` EDU `content` object with a unique message ID. |
501 | | - """ |
502 | | - # The EDU contains a "message_id" property which is used for |
503 | | - # idempotence. Make up a random one. |
504 | | - message_id = random_string_insecure_fast(16) |
505 | | - content = { |
506 | | - "sender": sender_user_id, |
507 | | - "type": message_type, |
508 | | - "message_id": message_id, |
509 | | - "messages": messages_by_user_then_device, |
510 | | - } |
511 | | - return content |
0 commit comments