Skip to content

Commit 95960cf

Browse files
paul-szczepanek-armSevenarth
authored andcommitted
dts: move packet handling and utilities to API directory
Split TestSuite methods between test run methods and packet related methods. Signed-off-by: Paul Szczepanek <[email protected]> Reviewed-by: Luca Vizzarro <[email protected]> Reviewed-by: Patrick Robb <[email protected]>
1 parent 770786e commit 95960cf

28 files changed

+645
-432
lines changed

doc/api/dts/api.packet.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.. SPDX-License-Identifier: BSD-3-Clause
2+
3+
packet - Sending and capturing packets
4+
======================================
5+
6+
.. automodule:: api.packet
7+
:members:
8+
:show-inheritance:

doc/api/dts/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@ api - DTS API
1818
:maxdepth: 1
1919

2020
api.capabilities
21+
api.packet
22+
api.test

doc/api/dts/api.test.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.. SPDX-License-Identifier: BSD-3-Clause
2+
3+
test - Reporting results and logging
4+
====================================
5+
6+
.. automodule:: api.test
7+
:members:
8+
:show-inheritance:

doc/guides/tools/dts.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ Test Case Verification
480480

481481
Use the verify method to assert conditions and record test results.
482482
This should typically be called at the end of each test case.
483-
Example: ``self.verify(link_up, "Link should be up after configuration.")``
483+
Example: ``verify(link_up, "Link should be up after configuration.")``
484484

485485
Other Methods
486486
~~~~~~~~~~~~~

dts/api/packet.py

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
# SPDX-License-Identifier: BSD-3-Clause
2+
# Copyright(c) 2025 Arm Limited
3+
4+
"""Packet utilities for test suites.
5+
6+
The module provides helpers for:
7+
* Packet sending and verification,
8+
* Packet adjustments and modification.
9+
10+
Example:
11+
.. code:: python
12+
13+
from scapy.layers.inet import IP
14+
from scapy.layers.l2 import Ether
15+
from api.packet import send_packet_and_capture, get_expected_packet, match_all_packets
16+
17+
pkt = Ether()/IP()/b"payload"
18+
received = send_packet_and_capture(pkt)
19+
expected = get_expected_packet(pkt)
20+
match_all_packets([expected], received)
21+
"""
22+
23+
from collections import Counter
24+
from typing import cast
25+
26+
from scapy.layers.inet import IP
27+
from scapy.layers.l2 import Ether
28+
from scapy.packet import Packet, Padding, raw
29+
30+
from api.test import fail, log_debug
31+
from framework.context import get_ctx
32+
from framework.exception import InternalError
33+
from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
34+
PacketFilteringConfig,
35+
)
36+
from framework.utils import get_packet_summaries
37+
38+
39+
def send_packet_and_capture(
40+
packet: Packet,
41+
filter_config: PacketFilteringConfig = PacketFilteringConfig(),
42+
duration: float = 1,
43+
) -> list[Packet]:
44+
"""Send and receive `packet` using the associated TG.
45+
46+
Send `packet` through the appropriate interface and receive on the appropriate interface.
47+
Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
48+
49+
Args:
50+
packet: The packet to send.
51+
filter_config: The filter to use when capturing packets.
52+
duration: Capture traffic for this amount of time after sending `packet`.
53+
54+
Returns:
55+
A list of received packets.
56+
"""
57+
return send_packets_and_capture(
58+
[packet],
59+
filter_config,
60+
duration,
61+
)
62+
63+
64+
def send_packets_and_capture(
65+
packets: list[Packet],
66+
filter_config: PacketFilteringConfig = PacketFilteringConfig(),
67+
duration: float = 1,
68+
) -> list[Packet]:
69+
"""Send and receive `packets` using the associated TG.
70+
71+
Send `packets` through the appropriate interface and receive on the appropriate interface.
72+
Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
73+
74+
Args:
75+
packets: The packets to send.
76+
filter_config: The filter to use when capturing packets.
77+
duration: Capture traffic for this amount of time after sending `packet`.
78+
79+
Returns:
80+
A list of received packets.
81+
"""
82+
from framework.context import get_ctx
83+
from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
84+
CapturingTrafficGenerator,
85+
)
86+
87+
assert isinstance(
88+
get_ctx().tg, CapturingTrafficGenerator
89+
), "Cannot capture with a non-capturing traffic generator"
90+
tg: CapturingTrafficGenerator = cast(CapturingTrafficGenerator, get_ctx().tg)
91+
# TODO: implement @requires for types of traffic generator
92+
packets = adjust_addresses(packets)
93+
return tg.send_packets_and_capture(
94+
packets,
95+
get_ctx().topology.tg_port_egress,
96+
get_ctx().topology.tg_port_ingress,
97+
filter_config,
98+
duration,
99+
)
100+
101+
102+
def send_packets(
103+
packets: list[Packet],
104+
) -> None:
105+
"""Send packets using the traffic generator and do not capture received traffic.
106+
107+
Args:
108+
packets: Packets to send.
109+
"""
110+
packets = adjust_addresses(packets)
111+
get_ctx().tg.send_packets(packets, get_ctx().topology.tg_port_egress)
112+
113+
114+
def get_expected_packets(
115+
packets: list[Packet],
116+
sent_from_tg: bool = False,
117+
) -> list[Packet]:
118+
"""Inject the proper L2/L3 addresses into `packets`.
119+
120+
Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
121+
122+
Args:
123+
packets: The packets to modify.
124+
sent_from_tg: If :data:`True` packet was sent from the TG.
125+
126+
Returns:
127+
`packets` with injected L2/L3 addresses.
128+
"""
129+
return adjust_addresses(packets, not sent_from_tg)
130+
131+
132+
def get_expected_packet(
133+
packet: Packet,
134+
sent_from_tg: bool = False,
135+
) -> Packet:
136+
"""Inject the proper L2/L3 addresses into `packet`.
137+
138+
Inject the L2/L3 addresses expected at the receiving end of the traffic generator.
139+
140+
Args:
141+
packet: The packet to modify.
142+
sent_from_tg: If :data:`True` packet was sent from the TG.
143+
144+
Returns:
145+
`packet` with injected L2/L3 addresses.
146+
"""
147+
return get_expected_packets([packet], sent_from_tg)[0]
148+
149+
150+
def adjust_addresses(packets: list[Packet], expected: bool = False) -> list[Packet]:
151+
"""L2 and L3 address additions in both directions.
152+
153+
Copies of `packets` will be made, modified and returned in this method.
154+
155+
Only missing addresses are added to packets, existing addresses will not be overridden. If
156+
any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most
157+
IP layer will have its addresses adjusted.
158+
159+
Assumptions:
160+
Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
161+
162+
Args:
163+
packets: The packets to modify.
164+
expected: If :data:`True`, the direction is SUT -> TG,
165+
otherwise the direction is TG -> SUT.
166+
167+
Returns:
168+
A list containing copies of all packets in `packets` after modification.
169+
170+
Raises:
171+
InternalError: If no tests are running.
172+
"""
173+
from framework.test_suite import TestSuite
174+
175+
if get_ctx().local.current_test_suite is None:
176+
raise InternalError("No current test suite, tests aren't running?")
177+
current_test_suite: TestSuite = cast(TestSuite, get_ctx().local.current_test_suite)
178+
return current_test_suite._adjust_addresses(packets, expected)
179+
180+
181+
def match_all_packets(
182+
expected_packets: list[Packet],
183+
received_packets: list[Packet],
184+
verify: bool = True,
185+
) -> bool:
186+
"""Matches all the expected packets against the received ones.
187+
188+
Matching is performed by counting down the occurrences in a dictionary which keys are the
189+
raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
190+
are automatically ignored.
191+
192+
Args:
193+
expected_packets: The packets we are expecting to receive.
194+
received_packets: All the packets that were received.
195+
verify: If :data:`True`, and there are missing packets an exception will be raised.
196+
197+
Raises:
198+
TestCaseVerifyError: if and not all the `expected_packets` were found in
199+
`received_packets`.
200+
201+
Returns:
202+
:data:`True` If there are no missing packets.
203+
"""
204+
expected_packets_counters = Counter(map(raw, expected_packets))
205+
received_packets_counters = Counter(map(raw, received_packets))
206+
# The number of expected packets is subtracted by the number of received packets, ignoring
207+
# any unexpected packets and capping at zero.
208+
missing_packets_counters = expected_packets_counters - received_packets_counters
209+
missing_packets_count = missing_packets_counters.total()
210+
log_debug(
211+
f"match_all_packets: expected {len(expected_packets)}, "
212+
f"received {len(received_packets)}, missing {missing_packets_count}"
213+
)
214+
215+
if missing_packets_count != 0:
216+
if verify:
217+
fail(
218+
f"Not all packets were received, expected {len(expected_packets)} "
219+
f"but {missing_packets_count} were missing."
220+
)
221+
return False
222+
223+
return True
224+
225+
226+
def verify_packets(expected_packet: Packet, received_packets: list[Packet]) -> None:
227+
"""Verify that `expected_packet` has been received.
228+
229+
Go through `received_packets` and check that `expected_packet` is among them.
230+
If not, raise an exception and log the last 10 commands
231+
executed on both the SUT and TG.
232+
233+
Args:
234+
expected_packet: The packet we're expecting to receive.
235+
received_packets: The packets where we're looking for `expected_packet`.
236+
237+
Raises:
238+
TestCaseVerifyError: `expected_packet` is not among `received_packets`.
239+
"""
240+
for received_packet in received_packets:
241+
if _compare_packets(expected_packet, received_packet):
242+
break
243+
else:
244+
log_debug(
245+
f"The expected packet {expected_packet.summary()} "
246+
f"not found among received {get_packet_summaries(received_packets)}"
247+
)
248+
fail("An expected packet not found among received packets.")
249+
250+
251+
def _compare_packets(expected_packet: Packet, received_packet: Packet) -> bool:
252+
log_debug(f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}")
253+
254+
l3 = IP in expected_packet.layers()
255+
log_debug("Found l3 layer")
256+
257+
received_payload = received_packet
258+
expected_payload = expected_packet
259+
while received_payload and expected_payload:
260+
log_debug("Comparing payloads:")
261+
log_debug(f"Received: {received_payload}")
262+
log_debug(f"Expected: {expected_payload}")
263+
if type(received_payload) is type(expected_payload):
264+
log_debug("The layers are the same.")
265+
if type(received_payload) is Ether:
266+
if not _verify_l2_frame(received_payload, l3):
267+
return False
268+
elif type(received_payload) is IP:
269+
assert type(expected_payload) is IP
270+
if not _verify_l3_packet(received_payload, expected_payload):
271+
return False
272+
else:
273+
# Different layers => different packets
274+
return False
275+
received_payload = received_payload.payload
276+
expected_payload = expected_payload.payload
277+
278+
if expected_payload:
279+
log_debug(f"The expected packet did not contain {expected_payload}.")
280+
return False
281+
if received_payload and received_payload.__class__ != Padding:
282+
log_debug("The received payload had extra layers which were not padding.")
283+
return False
284+
return True
285+
286+
287+
def _verify_l2_frame(received_packet: Ether, contains_l3: bool) -> bool:
288+
"""Verify the L2 frame of `received_packet`.
289+
290+
Args:
291+
received_packet: The received L2 frame to verify.
292+
contains_l3: If :data:`True`, the packet contains an L3 layer.
293+
"""
294+
log_debug("Looking at the Ether layer.")
295+
log_debug(
296+
f"Comparing received dst mac '{received_packet.dst}' "
297+
f"with expected '{get_ctx().topology.tg_port_ingress.mac_address}'."
298+
)
299+
if received_packet.dst != get_ctx().topology.tg_port_ingress.mac_address:
300+
return False
301+
302+
expected_src_mac = get_ctx().topology.tg_port_egress.mac_address
303+
if contains_l3:
304+
expected_src_mac = get_ctx().topology.sut_port_egress.mac_address
305+
log_debug(
306+
f"Comparing received src mac '{received_packet.src}' "
307+
f"with expected '{expected_src_mac}'."
308+
)
309+
if received_packet.src != expected_src_mac:
310+
return False
311+
312+
return True
313+
314+
315+
def _verify_l3_packet(received_packet: IP, expected_packet: IP) -> bool:
316+
log_debug("Looking at the IP layer.")
317+
if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
318+
return False
319+
return True

0 commit comments

Comments
 (0)