3
3
import logging
4
4
import os
5
5
import tempfile
6
+ import time
6
7
from typing import Optional , TextIO , Union
7
8
8
9
import esptool
10
+ from pytest_embedded .log import MessageQueue
9
11
from pytest_embedded_serial_esp .serial import EspSerial
10
12
11
13
from .app import IdfApp
@@ -44,9 +46,56 @@ def __init__(
44
46
** kwargs ,
45
47
)
46
48
47
- def _post_init (self ):
49
+ def _before_init_port (self , q : MessageQueue ):
50
+ if not self .flash_port :
51
+ return
52
+
53
+ self .occupied_ports [self .flash_port ] = None
54
+ logging .debug (f'occupied { self .flash_port } ' )
55
+
48
56
if self .erase_all :
49
- self .skip_autoflash = False
57
+ self .skip_autoflash = False # do we really need it? may be a breaking change if removed
58
+
59
+ cmd = ['--port' , self .flash_port , 'erase_flash' ]
60
+ if self ._force_flag ():
61
+ cmd .append ('--force' )
62
+
63
+ with contextlib .redirect_stdout (q ):
64
+ esptool .main (cmd )
65
+
66
+ if self ._meta :
67
+ self ._meta .drop_port_app_cache (self .flash_port )
68
+
69
+ if self .skip_autoflash :
70
+ return
71
+
72
+ if self .erase_nvs :
73
+ _args , _kwargs = self ._get_erase_nvs_cli_args (port = self .flash_port )
74
+ with contextlib .redirect_stdout (q ):
75
+ esptool .main (_args , ** _kwargs )
76
+
77
+ # this is required to wait for the reset happened after erase the nvs partition
78
+ time .sleep (1 )
79
+
80
+ _args , _kwargs = self ._get_flash_cli_args (port = self .flash_port )
81
+ with contextlib .redirect_stdout (q ):
82
+ esptool .main (_args , ** _kwargs )
83
+
84
+ # after flash caches
85
+ # here instead of ``_finalize_init`` to avoid occupying the port wrongly while multi-DUT test case
86
+ self ._flashed_with_different_port = True
87
+ if self ._meta :
88
+ self ._meta .set_port_target_cache (self .flash_port , self .app .target )
89
+ self ._meta .set_port_app_cache (self .flash_port , self .app )
90
+
91
+ # this is required to wait for the reset happened after flashing
92
+ time .sleep (1 )
93
+
94
+ def _post_init (self ):
95
+ if self ._flashed_with_different_port :
96
+ pass
97
+ elif self .erase_all :
98
+ self .skip_autoflash = False # do we really need it? may be a breaking change if removed
50
99
elif self ._meta and self ._meta .hit_port_app_cache (self .port , self .app ):
51
100
if self .confirm_target_elf_sha256 :
52
101
if self .is_target_flashed_same_elf ():
@@ -66,6 +115,13 @@ def _post_init(self):
66
115
super ()._post_init ()
67
116
68
117
def _start (self ):
118
+ if self ._flashed_with_different_port :
119
+ if self ._meta :
120
+ self ._meta .set_port_app_cache (self .port , self .app )
121
+
122
+ super ()._start ()
123
+ return
124
+
69
125
if self .skip_autoflash :
70
126
logging .info ('Skipping auto flash...' )
71
127
super ()._start ()
@@ -75,6 +131,13 @@ def _start(self):
75
131
else :
76
132
self .flash ()
77
133
134
+ def close (self ):
135
+ if self ._flashed_with_different_port :
136
+ self .occupied_ports .pop (self .flash_port , None )
137
+ logging .debug (f'released { self .flash_port } ' )
138
+
139
+ super ().close ()
140
+
78
141
def load_ram (self ) -> None :
79
142
if not self .app .is_loadable_elf :
80
143
raise ValueError ('elf should be loadable elf' )
@@ -131,11 +194,12 @@ def erase_flash(self, force: bool = False):
131
194
else :
132
195
super ().erase_flash ()
133
196
134
- @EspSerial .use_esptool ()
135
- def flash (self , app : Optional [IdfApp ] = None ) -> None :
136
- """
137
- Flash the `app.flash_files` to the dut
138
- """
197
+ def _get_flash_cli_args (
198
+ self ,
199
+ * ,
200
+ app : Optional [IdfApp ] = None ,
201
+ port : Optional [str ] = None ,
202
+ ):
139
203
if not app :
140
204
app = self .app
141
205
@@ -148,6 +212,8 @@ def flash(self, app: Optional[IdfApp] = None) -> None:
148
212
return
149
213
150
214
_args = []
215
+ _kwargs = {}
216
+
151
217
for k , v in app .flash_args ['extra_esptool_args' ].items ():
152
218
if isinstance (v , bool ):
153
219
if k == 'stub' :
@@ -166,17 +232,6 @@ def flash(self, app: Optional[IdfApp] = None) -> None:
166
232
_args .extend (['--baud' , os .getenv ('ESPBAUD' , '921600' )])
167
233
_args .append ('write_flash' )
168
234
169
- if self .erase_nvs :
170
- esptool .main (
171
- [
172
- 'erase_region' ,
173
- str (app .partition_table ['nvs' ]['offset' ]),
174
- str (app .partition_table ['nvs' ]['size' ]),
175
- ],
176
- esp = self .esp ,
177
- )
178
- self .esp .connect ()
179
-
180
235
encrypt_files = []
181
236
flash_files = []
182
237
for file in app .flash_files :
@@ -195,7 +250,46 @@ def flash(self, app: Optional[IdfApp] = None) -> None:
195
250
196
251
_args .extend ([* app .flash_args ['write_flash_args' ], * self ._force_flag (app )])
197
252
198
- esptool .main (_args , esp = self .esp )
253
+ if port :
254
+ _args = ['--port' , port , * _args ]
255
+ else :
256
+ _kwargs .update ({'esp' : self .esp })
257
+
258
+ return _args , _kwargs
259
+
260
+ def _get_erase_nvs_cli_args (self , app : Optional [IdfApp ] = None , port : Optional [str ] = None ):
261
+ if not app :
262
+ app = self .app
263
+
264
+ _args = [
265
+ 'erase_region' ,
266
+ str (app .partition_table ['nvs' ]['offset' ]),
267
+ str (app .partition_table ['nvs' ]['size' ]),
268
+ ]
269
+ _kwargs = {}
270
+
271
+ if port :
272
+ _args = ['--port' , port , * _args ]
273
+ else :
274
+ _kwargs .update ({'esp' : self .esp })
275
+
276
+ return _args , _kwargs
277
+
278
+ @EspSerial .use_esptool ()
279
+ def flash (self , app : Optional [IdfApp ] = None ) -> None :
280
+ """
281
+ Flash the app to the target device, or the app provided.
282
+ """
283
+ if not app :
284
+ app = self .app
285
+
286
+ if self .erase_nvs :
287
+ _args , _kwargs = self ._get_erase_nvs_cli_args (app = app )
288
+ esptool .main (_args , ** _kwargs )
289
+ self .esp .connect ()
290
+
291
+ _args , _kwargs = self ._get_flash_cli_args (app = app )
292
+ esptool .main (_args , ** _kwargs )
199
293
200
294
if self ._meta :
201
295
self ._meta .set_port_app_cache (self .port , app )
0 commit comments