1515import os
1616import signal
1717import tempfile
18+ from dataclasses import dataclass
19+ from typing import Optional , Union
1820
21+ import matter .clusters as Clusters
22+ from matter .ChipDeviceCtrl import ChipDeviceController
23+ from matter .clusters .Types import NullValue
1924from matter .testing .tasks import Subprocess
2025
2126
27+ @dataclass
28+ class OtaImagePath :
29+ """Represents a path to a single OTA image file."""
30+ path : str
31+
32+ @property
33+ def ota_args (self ) -> list [str ]:
34+ """Return the command line arguments for this OTA image path."""
35+ return ["--filepath" , self .path ]
36+
37+
38+ @dataclass
39+ class ImageListPath :
40+ """Represents a path to a file containing a list of OTA images."""
41+ path : str
42+
43+ @property
44+ def ota_args (self ) -> list [str ]:
45+ """Return the command line arguments for this image list path."""
46+ return ["--otaImageList" , self .path ]
47+
48+
2249class AppServerSubprocess (Subprocess ):
2350 """Wrapper class for starting an application server in a subprocess."""
2451
@@ -27,27 +54,36 @@ class AppServerSubprocess(Subprocess):
2754
2855 def __init__ (self , app : str , storage_dir : str , discriminator : int ,
2956 passcode : int , port : int = 5540 , extra_args : list [str ] = []):
57+ # Create a temporary KVS file and keep the descriptor to avoid leaks.
3058 self .kvs_fd , kvs_path = tempfile .mkstemp (dir = storage_dir , prefix = "kvs-app-" )
31-
32- # Build the command list
33- command = [app ]
34- if extra_args :
35- command .extend (extra_args )
36-
37- command .extend ([
38- "--KVS" , kvs_path ,
39- '--secured-device-port' , str (port ),
40- "--discriminator" , str (discriminator ),
41- "--passcode" , str (passcode )
42- ])
43-
44- # Start the server application
45- super ().__init__ (* command , # Pass the constructed command list
46- output_cb = lambda line , is_stderr : self .PREFIX + line )
59+ try :
60+ # Build the command list
61+ command = [app ]
62+ if extra_args :
63+ command .extend (extra_args )
64+
65+ command .extend ([
66+ "--KVS" , kvs_path ,
67+ '--secured-device-port' , str (port ),
68+ "--discriminator" , str (discriminator ),
69+ "--passcode" , str (passcode )
70+ ])
71+
72+ # Start the server application
73+ super ().__init__ (* command , # Pass the constructed command list
74+ output_cb = lambda line , is_stderr : self .PREFIX + line )
75+ except Exception :
76+ # Do not leak KVS file descriptor on failure
77+ os .close (self .kvs_fd )
78+ raise
4779
4880 def __del__ (self ):
4981 # Do not leak KVS file descriptor.
50- os .close (self .kvs_fd )
82+ if hasattr (self , "kvs_fd" ):
83+ try :
84+ os .close (self .kvs_fd )
85+ except OSError :
86+ pass
5187
5288
5389class IcdAppServerSubprocess (AppServerSubprocess ):
@@ -102,3 +138,81 @@ def __init__(self, app: str, rpc_server_port: int, storage_dir: str,
102138 # Start the server application
103139 super ().__init__ (* command , # Pass the constructed command list
104140 output_cb = lambda line , is_stderr : self .PREFIX + line )
141+
142+
143+ class OTAProviderSubprocess (AppServerSubprocess ):
144+ """Wrapper class for starting an OTA Provider application server in a subprocess."""
145+
146+ DEFAULT_ADMIN_NODE_ID = 112233
147+
148+ # Prefix for log messages from the OTA provider application.
149+ PREFIX = b"[OTA-PROVIDER]"
150+
151+ def __init__ (self , app : str , storage_dir : str , discriminator : int ,
152+ passcode : int , ota_source : Union [OtaImagePath , ImageListPath ],
153+ port : int = 5541 , extra_args : list [str ] = []):
154+ """Initialize the OTA Provider subprocess.
155+
156+ Args:
157+ app: Path to the chip-ota-provider-app executable
158+ storage_dir: Directory for persistent storage
159+ discriminator: Discriminator for commissioning
160+ passcode: Passcode for commissioning
161+ port: UDP port for secure connections (default: 5541)
162+ ota_source: Either OtaImagePath or ImageListPath specifying the OTA image source
163+ extra_args: Additional command line arguments
164+ """
165+
166+ # Build OTA-specific arguments using the ota_source property
167+ combined_extra_args = ota_source .ota_args + extra_args
168+
169+ # Initialize with the combined arguments
170+ super ().__init__ (app = app , storage_dir = storage_dir , discriminator = discriminator ,
171+ passcode = passcode , port = port , extra_args = combined_extra_args )
172+
173+ def create_acl_entry (self , dev_ctrl : ChipDeviceController , provider_node_id : int , requestor_node_id : Optional [int ] = None ):
174+ """Create ACL entries to allow OTA requestors to access the provider.
175+
176+ Args:
177+ dev_ctrl: Device controller for sending commands
178+ provider_node_id: Node ID of the OTA provider
179+ requestor_node_id: Optional specific requestor node ID for targeted access
180+
181+ Returns:
182+ Result of the ACL write operation
183+ """
184+ # Standard ACL entry for OTA Provider cluster
185+ admin_node_id = dev_ctrl .nodeId if hasattr (dev_ctrl , 'nodeId' ) else self .DEFAULT_ADMIN_NODE_ID
186+ requestor_subjects = [requestor_node_id ] if requestor_node_id else NullValue
187+
188+ # Create ACL entries using proper struct constructors
189+ acl_entries = [
190+ # Admin entry
191+ Clusters .AccessControl .Structs .AccessControlEntryStruct ( # type: ignore
192+ privilege = Clusters .AccessControl .Enums .AccessControlEntryPrivilegeEnum .kAdminister , # type: ignore
193+ authMode = Clusters .AccessControl .Enums .AccessControlEntryAuthModeEnum .kCase , # type: ignore
194+ subjects = [admin_node_id ], # type: ignore
195+ targets = NullValue
196+ ),
197+ # Operate entry
198+ Clusters .AccessControl .Structs .AccessControlEntryStruct ( # type: ignore
199+ privilege = Clusters .AccessControl .Enums .AccessControlEntryPrivilegeEnum .kOperate , # type: ignore
200+ authMode = Clusters .AccessControl .Enums .AccessControlEntryAuthModeEnum .kCase , # type: ignore
201+ subjects = requestor_subjects , # type: ignore
202+ targets = [
203+ Clusters .AccessControl .Structs .AccessControlTargetStruct ( # type: ignore
204+ cluster = Clusters .OtaSoftwareUpdateProvider .id , # type: ignore
205+ endpoint = NullValue ,
206+ deviceType = NullValue
207+ )
208+ ],
209+ )
210+ ]
211+
212+ # Create the attribute descriptor for the ACL attribute
213+ acl_attribute = Clusters .AccessControl .Attributes .Acl (acl_entries )
214+
215+ return dev_ctrl .WriteAttribute (
216+ nodeid = provider_node_id ,
217+ attributes = [(0 , acl_attribute )]
218+ )
0 commit comments