11# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
22# for details about the block below.
33#
4+ # These are separated into different runs because the logs for these tests are HUGE. The attribute one individually
5+ # reads every attribute on every cluster 4 times. If there's a failure, having these in separate runs makes it significantly
6+ # easier to navigate the logs
7+ #
48# === BEGIN CI TEST ARGUMENTS ===
59# test-runner-runs:
610# run1:
1519# --passcode 20202021
1620# --trace-to json:${TRACE_TEST_JSON}.json
1721# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
22+ # --tests test_TC_ACE_2_1
23+ # run2:
24+ # app: ${ALL_CLUSTERS_APP}
25+ # factory-reset: true
26+ # quiet: true
27+ # app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
28+ # script-args: >
29+ # --storage-path admin_storage.json
30+ # --commissioning-method on-network
31+ # --discriminator 1234
32+ # --passcode 20202021
33+ # --trace-to json:${TRACE_TEST_JSON}.json
34+ # --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
35+ # --tests test_TC_ACE_2_2
36+ # run3:
37+ # app: ${ALL_CLUSTERS_APP}
38+ # factory-reset: true
39+ # quiet: true
40+ # app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
41+ # script-args: >
42+ # --storage-path admin_storage.json
43+ # --commissioning-method on-network
44+ # --discriminator 1234
45+ # --passcode 20202021
46+ # --trace-to json:${TRACE_TEST_JSON}.json
47+ # --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
48+ # --bool-arg ci_only_linux_skip_ota_cluster_disallowed_for_certification:True
49+ # --tests test_TC_ACE_2_3
1850# === END CI TEST ARGUMENTS ===
1951
2052import logging
2355from typing import Optional
2456
2557import chip .clusters as Clusters
26- from chip .interaction_model import Status
58+ from chip .interaction_model import InteractionModelError , Status
2759from chip .testing .basic_composition import BasicCompositionTests
28- from chip .testing .global_attribute_ids import GlobalAttributeIds
29- from chip .testing .matter_testing import (AttributePathLocation , ClusterPathLocation , MatterBaseTest , TestStep , async_test_body ,
30- default_matter_test_main )
60+ from chip .testing .global_attribute_ids import (GlobalAttributeIds , is_standard_attribute_id , is_standard_cluster_id ,
61+ is_standard_command_id )
62+ from chip .testing .matter_testing import (AttributePathLocation , ClusterPathLocation , CommandPathLocation , MatterBaseTest , TestStep ,
63+ async_test_body , default_matter_test_main )
3164from chip .testing .spec_parsing import XmlCluster
3265from chip .tlv import uint
3366
3467
3568class AccessTestType (Enum ):
3669 READ = auto ()
3770 WRITE = auto ()
71+ INVOKE = auto ()
3872
3973
4074def step_number_with_privilege (step : int , substep : str , privilege : Clusters .AccessControl .Enums .AccessControlEntryPrivilegeEnum ) -> str :
@@ -59,10 +93,19 @@ def operation_allowed(spec_requires: Clusters.AccessControl.Enums.AccessControlE
5993def checkable_attributes (cluster_id , cluster , xml_cluster ) -> list [uint ]:
6094 all_attrs = cluster [GlobalAttributeIds .ATTRIBUTE_LIST_ID ]
6195
62- def known_cluster_attribute (attribute_id ) -> bool :
96+ def is_known_cluster_attribute (attribute_id ) -> bool :
6397 ''' Returns true if this is a non-manufacturer specific attribute that has information in the XML and has python codegen data'''
64- return attribute_id <= 0xFFFF and attribute_id in xml_cluster .attributes and attribute_id in Clusters .ClusterObjects .ALL_ATTRIBUTES [cluster_id ]
65- return [x for x in all_attrs if known_cluster_attribute (x )]
98+ return is_standard_attribute_id (attribute_id ) and attribute_id in xml_cluster .attributes and attribute_id in Clusters .ClusterObjects .ALL_ATTRIBUTES [cluster_id ]
99+ return [attr_id for attr_id in all_attrs if is_known_cluster_attribute (attr_id )]
100+
101+
102+ def checkable_commands (cluster_id , cluster , xml_cluster ) -> list [uint ]:
103+ all_cmds = cluster [GlobalAttributeIds .ACCEPTED_COMMAND_LIST_ID ]
104+
105+ def is_known_cluster_cmd (command_id ) -> bool :
106+ ''' Returns true if this is a non-manufacturer specific command that has information in the XML and has python codegen data'''
107+ return is_standard_command_id (command_id ) and command_id in xml_cluster .accepted_commands and command_id in Clusters .ClusterObjects .ALL_ACCEPTED_COMMANDS [cluster_id ]
108+ return [cmd_id for cmd_id in all_cmds if is_known_cluster_cmd (cmd_id )]
66109
67110
68111class AccessChecker (MatterBaseTest , BasicCompositionTests ):
@@ -75,7 +118,7 @@ async def setup_class(self):
75118 self .build_spec_xmls ()
76119
77120 acl_attr = Clusters .AccessControl .Attributes .Acl
78- self .default_acl = await self .read_single_attribute_check_success (cluster = Clusters .AccessControl , attribute = acl_attr )
121+ self .default_acl = await self .read_single_attribute_check_success (endpoint = 0 , cluster = Clusters .AccessControl , attribute = acl_attr )
79122 self ._record_errors ()
80123 # We need to run this test from two controllers so we can test access to the ACL cluster while retaining access to the ACL cluster
81124 fabric_admin = self .certificate_authority_manager .activeCaList [0 ].adminList [0 ]
@@ -110,20 +153,25 @@ async def _setup_acl(self, privilege: Optional[Clusters.AccessControl.Enums.Acce
110153 def _record_errors (self ):
111154 ''' Checks through all the endpoints and records all the spec warnings in one go so we don't get repeats'''
112155 all_clusters = set ()
113- attrs : dict [uint , set ()] = {}
156+ attrs : dict [uint , set ] = {}
157+ cmds : dict [uint , set ] = {}
114158
115159 for endpoint_id , endpoint in self .endpoints_tlv .items ():
116160 all_clusters |= set (endpoint .keys ())
117161 for cluster_id , device_cluster_data in endpoint .items ():
118162 # Find all the attributes for this cluster across all endpoint
119163 if cluster_id not in attrs :
120164 attrs [cluster_id ] = set ()
165+ if cluster_id not in cmds :
166+ cmds [cluster_id ] = set ()
121167 # discard MEI attributes as we do not have access information for them.
122168 attrs [cluster_id ].update (
123- set ([id for id in device_cluster_data [GlobalAttributeIds .ATTRIBUTE_LIST_ID ] if id <= 0xFFFF ]))
169+ set ([id for id in device_cluster_data [GlobalAttributeIds .ATTRIBUTE_LIST_ID ] if is_standard_attribute_id (id )]))
170+ cmds [cluster_id ].update (
171+ set ([id for id in device_cluster_data [GlobalAttributeIds .ACCEPTED_COMMAND_LIST_ID ] if is_standard_command_id (id )]))
124172
125173 # Remove MEI clusters - we don't have information available to check these.
126- all_clusters = [id for id in all_clusters if id <= 0x7FFF ]
174+ all_clusters = [id for id in all_clusters if is_standard_cluster_id ( id ) ]
127175 for cluster_id in all_clusters :
128176 location = ClusterPathLocation (endpoint_id = 0 , cluster_id = cluster_id )
129177 if cluster_id not in self .xml_clusters :
@@ -139,7 +187,7 @@ def _record_errors(self):
139187 xml_cluster = self .xml_clusters [cluster_id ]
140188 for attribute_id in attrs [cluster_id ]:
141189 location = AttributePathLocation (endpoint_id = endpoint_id , cluster_id = cluster_id , attribute_id = attribute_id )
142- if attribute_id not in xml_cluster .attributes :
190+ if attribute_id not in xml_cluster .attributes . keys () :
143191 self .record_warning (test_name = "Access Checker" , location = location ,
144192 problem = "Cluster attribute not found in spec XML" )
145193 continue
@@ -148,6 +196,55 @@ def _record_errors(self):
148196 problem = "Unknown attribute" )
149197 self .success = False
150198 continue
199+ # Check that we have information for all the required commands
200+ for command_id in cmds [cluster_id ]:
201+ location = CommandPathLocation (endpoint_id = endpoint_id , cluster_id = cluster_id , command_id = command_id )
202+ if command_id not in xml_cluster .accepted_commands .keys ():
203+ self .record_warning (test_name = "Access Checker" , location = location ,
204+ problem = "Cluster command not found in spec XML" )
205+ continue
206+ if command_id not in Clusters .ClusterObjects .ALL_ACCEPTED_COMMANDS [cluster_id ]:
207+ self ._record_error (test_name = "Access Checker" , location = location ,
208+ problem = "Unknown command" )
209+ self .success = False
210+ continue
211+
212+ async def _maybe_run_command_access_test_for_cluster_privilege (self , endpoint_id , cluster_id , device_cluster_data , xml_cluster : XmlCluster , privilege : Clusters .AccessControl .Enums .AccessControlEntryPrivilegeEnum ):
213+ """ Runs a command only if the required cluster privilege is HIGHER than the specified privilege. In this way,
214+ no commands are actually run on the device, which means there are no side effects. However, we can differentiate
215+ ACL rejections from commands being unsupported.
216+ """
217+ ota_exception = self .user_params .get ('ci_only_linux_skip_ota_cluster_disallowed_for_certification' , False )
218+ if cluster_id == Clusters .OtaSoftwareUpdateRequestor .id and ota_exception :
219+ logging .warn ('WARNING: Skipping OTA cluster check for CI. THIS IS DISALLOWED FOR CERTIFICATION' )
220+ return
221+
222+ for command_id in checkable_commands (cluster_id , device_cluster_data , xml_cluster ):
223+ spec_requires = xml_cluster .accepted_commands [command_id ].privilege
224+ command = Clusters .ClusterObjects .ALL_ACCEPTED_COMMANDS [cluster_id ][command_id ]
225+ location = CommandPathLocation (endpoint_id = endpoint_id , cluster_id = cluster_id , command_id = command_id )
226+ name = f"Command test - privilege { privilege } "
227+ if operation_allowed (spec_requires , privilege ):
228+ # In this test, we're only checking that the disallowed commands are rejected so that there are
229+ # no side effects. Commands are checked with admin privilege in their cluster tests. The error that
230+ # may be let through here is if the spec requires operate and the implementation requires admin.
231+ continue
232+ try :
233+ timed = None
234+ if command .must_use_timed_invoke :
235+ # This command requires a timedRequest. Setting the timed value to largest value (unsigned int).
236+ # We're sending the command right away, so this value doesn't matter, but we do need to set a value here to trigger the timed request message.
237+ timed = 65535
238+ await self .send_single_cmd (cmd = command (), dev_ctrl = self .TH2 , endpoint = endpoint_id , timedRequestTimeoutMs = timed )
239+ # If this was successful, that's an error
240+ self .record_error (test_name = name , location = location ,
241+ problem = f"Unexpected success sending command { command } with privilege { privilege } " )
242+ self .success = False
243+ except InteractionModelError as e :
244+ if e .status != Status .UnsupportedAccess :
245+ self .record_error (test_name = name , location = location ,
246+ problem = f'Unexpected error sending command { command } with privilege { privilege } - expected UNSUPPORTED_ACCESS, got { e .status } ' )
247+ self .success = False
151248
152249 async def _run_read_access_test_for_cluster_privilege (self , endpoint_id , cluster_id , device_cluster_data , xml_cluster : XmlCluster , privilege : Clusters .AccessControl .Enums .AccessControlEntryPrivilegeEnum ):
153250 # TODO: This assumes all attributes are readable. Which they are currently. But we don't have a general way to mark otherwise.
@@ -241,14 +338,16 @@ async def run_access_test(self, test_type: AccessTestType):
241338 self .step (step_number_with_privilege (check_step , 'b' , privilege ))
242339 for endpoint_id , endpoint in self .endpoints_tlv .items ():
243340 for cluster_id , device_cluster_data in endpoint .items ():
244- if cluster_id > 0x7FFF or cluster_id not in self .xml_clusters or cluster_id not in Clusters .ClusterObjects .ALL_ATTRIBUTES :
341+ if not is_standard_cluster_id ( cluster_id ) or cluster_id not in self .xml_clusters or cluster_id not in Clusters .ClusterObjects .ALL_ATTRIBUTES :
245342 # These cases have already been recorded by the _record_errors function
246343 continue
247344 xml_cluster = self .xml_clusters [cluster_id ]
248345 if test_type == AccessTestType .READ :
249346 await self ._run_read_access_test_for_cluster_privilege (endpoint_id , cluster_id , device_cluster_data , xml_cluster , privilege )
250347 elif test_type == AccessTestType .WRITE :
251348 await self ._run_write_access_test_for_cluster_privilege (endpoint_id , cluster_id , device_cluster_data , xml_cluster , privilege , wildcard_read )
349+ elif test_type == AccessTestType .INVOKE :
350+ await self ._maybe_run_command_access_test_for_cluster_privilege (endpoint_id , cluster_id , device_cluster_data , xml_cluster , privilege )
252351 else :
253352 self .fail_current_test ("Unsupported test type" )
254353 if not self .success :
@@ -298,6 +397,33 @@ def desc_TC_ACE_2_2(self):
298397 async def test_TC_ACE_2_2 (self ):
299398 await self .run_access_test (AccessTestType .WRITE )
300399
400+ def steps_TC_ACE_2_3 (self ):
401+ steps = [TestStep ("precondition" , "DUT is commissioned" , is_commissioning = True ),
402+ TestStep (1 , "TH_commissioner performs a wildcard read" ),
403+ TestStep (2 , "TH_commissioner reads the ACL attribute" ),
404+ TestStep (3 , "Repeat steps 3a and 3b for each permission level" )]
405+ enum = Clusters .AccessControl .Enums .AccessControlEntryPrivilegeEnum
406+ privilege_enum = [p for p in enum if p != enum .kUnknownEnumValue ]
407+ for p in privilege_enum :
408+ steps .append (TestStep (step_number_with_privilege (3 , 'a' , p ),
409+ "TH_commissioner gives TH_second_controller the specified privilege" ))
410+ steps .append (TestStep (step_number_with_privilege (3 , 'b' , p ),
411+ """For each standard command on each standard cluster on each endpoint,
412+ TH_second_controller checks the permission requirements for that command.
413+ If the permission required for the command is HIGHER than the permission level being tested,
414+ TH_second_controller sends the command to the DUT using default values.
415+ Regardless of the command contents, the DUT should return an access error since access must be checked
416+ before the command is processed. Receipt of an UNSUPPORTED_COMMAND error is a conformance failure.""" ,
417+ "DUT returns UNSUPPORTED_ACCESS error" ))
418+ return steps
419+
420+ def desc_TC_ACE_2_3 (self ):
421+ return "[TC-ACE-2.3] Command Privilege Enforcement - [DUT as Server]"
422+
423+ @async_test_body
424+ async def test_TC_ACE_2_3 (self ):
425+ await self .run_access_test (AccessTestType .INVOKE )
426+
301427
302428if __name__ == "__main__" :
303429 default_matter_test_main ()
0 commit comments