Skip to content
98 changes: 98 additions & 0 deletions bellows/ezsp/fragmentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Implements APS fragmentation reassembly on the EZSP Host side,
mirroring the logic from fragmentation.c in the EmberZNet stack.
"""

import asyncio
import logging
from collections import defaultdict
from typing import Optional, Dict, Tuple

LOGGER = logging.getLogger(__name__)

# The maximum time (in seconds) we wait for all fragments of a given message.
# If not all fragments arrive within this time, we discard the partial data.
FRAGMENT_TIMEOUT = 10

# store partial data keyed by (sender, aps_sequence, profile_id, cluster_id)
FragmentKey = Tuple[int, int, int, int]

class _FragmentEntry:
def __init__(self, fragment_count: int):
self.fragment_count = fragment_count
self.fragments_received = 0
self.fragment_data = {}
self.start_time = asyncio.get_event_loop().time()

def add_fragment(self, index: int, data: bytes) -> None:
if index not in self.fragment_data:
self.fragment_data[index] = data
self.fragments_received += 1

def is_complete(self) -> bool:
return self.fragments_received == self.fragment_count

def assemble(self) -> bytes:
return b''.join(self.fragment_data[i] for i in sorted(self.fragment_data.keys()))

class FragmentManager:
def __init__(self):
self._partial: Dict[FragmentKey, _FragmentEntry] = {}

def handle_incoming_fragment(self, sender_nwk: int, aps_sequence: int, profile_id: int, cluster_id: int,
group_id: int, payload: bytes) -> Tuple[bool, Optional[bytes], int, int]:
"""
Handle a newly received fragment. The group_id field
encodes high byte = total fragment count, low byte = current fragment index.

:param sender_nwk: NWK address or the short ID of the sender.
:param aps_sequence: The APS sequence from the incoming APS frame.
:param profile_id: The APS frame's profileId.
:param cluster_id: The APS frame's clusterId.
:param group_id: The APS frame's groupId (used to store fragment # / total).
:param payload: The fragment of data for this message.
:return: (complete, reassembled_data, fragment_count, fragment_index)
complete = True if we have all fragments now, else False
reassembled_data = the final complete payload (bytes) if complete is True
fragment_coutn = the total number of fragments holding the complete packet
fragment_index = the index of the current received fragment
"""
fragment_count = (group_id >> 8) & 0xFF
fragment_index = group_id & 0xFF

key: FragmentKey = (sender_nwk, aps_sequence, profile_id, cluster_id)

# If we have never seen this message, create a reassembly entry.
if key not in self._partial:
entry = _FragmentEntry(fragment_count)
self._partial[key] = entry
else:
entry = self._partial[key]

LOGGER.debug("Received fragment %d/%d from %s (APS seq=%d, cluster=0x%04X)",
fragment_index, fragment_count, sender_nwk, aps_sequence, cluster_id)

entry.add_fragment(fragment_index, payload)

if entry.is_complete():
reassembled = entry.assemble()
del self._partial[key]
LOGGER.debug("Message reassembly complete. Total length=%d", len(reassembled))
return (True, reassembled, fragment_count, fragment_index)
else:
return (False, None, fragment_count, fragment_index)

def cleanup_expired(self) -> None:

now = asyncio.get_event_loop().time()
to_remove = []
for k, entry in self._partial.items():
if now - entry.start_time > FRAGMENT_TIMEOUT:
to_remove.append(k)
for k in to_remove:
del self._partial[k]
LOGGER.debug("Removed stale fragment reassembly for key=%s", k)

# Create a single global manager instance
fragment_manager = FragmentManager()

54 changes: 54 additions & 0 deletions bellows/ezsp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

# Cached by `set_extended_timeout` so subsequent calls are a little faster
self._address_table_size: int | None = None
self._cleanup_fragments_periodically()

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.12

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.13

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.11.0

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.9.15

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

Check warning on line 56 in bellows/ezsp/protocol.py

View workflow job for this annotation

GitHub Actions / shared-ci / Run tests Python 3.10.8

coroutine 'ProtocolHandler._cleanup_fragments_periodically' was never awaited

def _ezsp_frame(self, name: str, *args: Any, **kwargs: Any) -> bytes:
"""Serialize the named frame and data."""
Expand Down Expand Up @@ -181,6 +182,38 @@
if data:
LOGGER.debug("Frame contains trailing data: %s", data)

if frame_name == "incomingMessageHandler" and result[1].options & 0x8000: # incoming message with APS_OPTION_FRAGMENT raised
from bellows.ezsp.fragmentation import fragment_manager

# Extract received APS frame and sender
aps_frame = result[1]
sender = result[4]

group_id = aps_frame.groupId
profile_id = aps_frame.profileId
cluster_id = aps_frame.clusterId
aps_seq = aps_frame.sequence

complete, reassembled, frag_count, frag_index = fragment_manager.handle_incoming_fragment(
sender_nwk=sender,
aps_sequence=aps_seq,
profile_id=profile_id,
cluster_id=cluster_id,
group_id=group_id,
payload=result[7]
)
asyncio.create_task(self._send_fragment_ack(sender, aps_frame, frag_count, frag_index)) # APS Ack

if not complete:
# Do not pass partial data up the stack
LOGGER.debug("Fragment reassembly not complete. waiting for more data.")
return
else:
# Replace partial data with fully reassembled data
result[7] = reassembled

LOGGER.debug("Reassembled fragmented message. Proceeding with normal handling.")

if sequence in self._awaiting:
expected_id, schema, future = self._awaiting.pop(sequence)
try:
Expand All @@ -205,6 +238,27 @@
else:
self._handle_callback(frame_name, result)

async def _send_fragment_ack(self, sender: int, incoming_aps: t.EmberApsFrame, fragment_count: int, fragment_index: int):

ackFrame = t.EmberApsFrame(
profileId=incoming_aps.profileId,
clusterId=incoming_aps.clusterId,
sourceEndpoint=incoming_aps.destinationEndpoint,
destinationEndpoint=incoming_aps.sourceEndpoint,
options=incoming_aps.options,
groupId=((0xFF00) | (fragment_index & 0xFF)),
sequence=incoming_aps.sequence
)

LOGGER.debug("Sending fragment ack to 0x%04X for fragment index=%d/%d", sender, fragment_index, fragment_count)
await self.sendReply(sender, ackFrame, b'')

async def _cleanup_fragments_periodically(self):
from bellows.ezsp.fragmentation import fragment_manager
while True:
await asyncio.sleep(5)
fragment_manager.cleanup_expired()

def __getattr__(self, name: str) -> Callable:
if name not in self.COMMANDS:
raise AttributeError(f"{name} not found in COMMANDS")
Expand Down
Loading