99import os
1010import sys
1111import logging
12- from pathlib import Path
1312from typing import List , Optional
1413
1514# Define paths and constants
1615PROC_INTERRUPTS = "/proc/interrupts"
17- GPIO_SYSFS_PATH = Path ("/sys/class/gpio" )
1816TEST_TIMEOUT = 30
1917
2018
@@ -48,6 +46,12 @@ def _get_irq_numbers(self) -> bool:
4846 self .irq_name ,
4947 PROC_INTERRUPTS ,
5048 )
49+ """
50+ The self.irq_numbers will be store the mapping of IRQs for target
51+ interrupt name as a dict and assigned IRQ number as key only.
52+ self.irq_numbers = { IRQ_number1: None,
53+ IRQ_number2: None }
54+ """
5155 try :
5256 with open (PROC_INTERRUPTS , "r" ) as f :
5357 for line in f :
@@ -105,7 +109,12 @@ def _get_smp_affinities(self) -> bool:
105109 for cpu in range (self .num_cpus )
106110 if (affinity_mask >> cpu ) & 1
107111 ]
108-
112+ """
113+ self.irq_numbers will be updated by assigning a list of
114+ affinity CPUs as the value for each IRQ accordingly.
115+ self.irq_numbers = { IRQ_number1: [0, 1],
116+ IRQ_number2: [3, 4] }
117+ """
109118 if not affected_cpus :
110119 logging .warning (
111120 "IRQ %d affinity mask '%s' targets no CPUs. "
@@ -140,6 +149,18 @@ def _get_interrupt_counts(self, irq_number: int) -> Optional[List[int]]:
140149 A list of integer counts, or None on error.
141150 """
142151 try :
152+ """
153+ counts will be a list of the numbers that from the output
154+ of /proc/interrupts. It will parse the line start with
155+ specific IRQ number and get the mulitple columes value
156+ which is depend on how many CPU cores for system.
157+ e.g. The output for /proc/interrupts as follows.
158+ count for IRQ number 1 will be count = [0, 0]
159+
160+ CPU0 CPU1
161+ 1: 0 0 GICv3 25 Level vgic
162+ 3: 4341111 1892740 GICv3 30 Level arch_timer
163+ """
143164 with open (PROC_INTERRUPTS , "r" ) as f :
144165 for line in f :
145166 # Match the line starting with the exact IRQ number
@@ -175,7 +196,11 @@ def run_test(self) -> bool:
175196 if not self ._get_smp_affinities ():
176197 raise RuntimeError ("Could not get CPU affinities for all IRQs." )
177198
178- # Store initial counts for all monitored IRQs
199+ """
200+ Store initial counts for all monitored IRQs
201+ initial_counts_map = { IRQ_number1: [cpu0_count, cpu1_count, ...],
202+ IRQ_number1: [cpu0_count, cpu1_count, ...] }
203+ """
179204 initial_counts_map = {}
180205 for irq in self .irq_numbers :
181206 counts = self ._get_interrupt_counts (irq )
@@ -185,6 +210,16 @@ def run_test(self) -> bool:
185210 initial_counts_map [irq ] = counts
186211
187212 logging .info ("Initial interrupt counts on target CPUs:" )
213+ """
214+ Following logic will mapping self.irq_numbers with initial_counts_map
215+ self.irq_numbers is a mapping of IRQ and affinity of CPUs.
216+ self.irq_numbers = { 132: [2,3],
217+ 144: [0,1] }
218+ initial_counts_map is a mapping of IRQ and the interrupt count of
219+ all CPUs
220+ initial_counts_map = { 132: [0, 0, 0, 0],
221+ 144: [2, 0, 4, 0] }
222+ """
188223 for irq , cpus in self .irq_numbers .items ():
189224 for cpu in cpus :
190225 logging .info (
@@ -229,194 +264,25 @@ def run_test(self) -> bool:
229264 return False
230265
231266
232- class GpioTest :
233- """
234- A class to test a GPIO button press using a context manager.
235-
236- This class checks if the GPIO pin is already exported. It will only
237- unexport the pin on exit if it was the one to export it initially,
238- preventing interference with other processes.
239- """
240-
241- def __init__ (self , name : str , gpio_pin : str ):
242- """
243- Initializes the GPIOTest.
244-
245- Args:
246- name: The name of the button.
247- gpio_pin: The GPIO pin number to test.
248- """
249- self .name = name
250- self .gpio_pin = gpio_pin
251- self .gpio_node = GPIO_SYSFS_PATH / "gpio{}" .format (self .gpio_pin )
252- self ._was_exported_before = False
253- logging .info ("Initialized test for '%s' on GPIO %s" , name , gpio_pin )
254-
255- def __enter__ (self ):
256- """
257- Enter the runtime context and set up the GPIO pin.
258-
259- This method checks the initial export state, then exports and
260- configures the GPIO pin for input if necessary.
261-
262- Returns:
263- The instance of the class (self).
264-
265- Raises:
266- IOError: If the GPIO setup fails.
267- """
268- logging .info ("--- Entering context: Setting up GPIO ---" )
269- # Monitor the initial state BEFORE any setup actions.
270- self ._was_exported_before = self .gpio_node .exists ()
271- if self ._was_exported_before :
272- logging .info (
273- "GPIO %s was already exported. Will not unexport on exit." ,
274- self .gpio_pin ,
275- )
276-
277- if not self ._setup_gpio ():
278- # Raise an error to prevent the 'with' block from executing
279- raise IOError ("Failed to set up GPIO {}" .format (self .gpio_pin ))
280- return self
281-
282- def __exit__ (self , exc_type , exc_value , traceback ):
283- """
284- Exit the runtime context and clean up the GPIO pin.
285-
286- This method unexports the GPIO pin only if it wasn't
287- exported before this context was entered.
288- """
289- logging .info ("--- Exiting context: Cleaning up GPIO ---" )
290- # Only unexport if the pin was not already exported.
291- if not self ._was_exported_before and self .gpio_node .exists ():
292- try :
293- logging .info (
294- "Unexporting GPIO %s (we exported it)..." , self .gpio_pin
295- )
296- (GPIO_SYSFS_PATH / "unexport" ).write_text (str (self .gpio_pin ))
297- except Exception as e :
298- logging .error (
299- "Failed to unexport GPIO %s: %s" , self .gpio_pin , e
300- )
301- else :
302- logging .info (
303- "Leaving GPIO %s exported as it was found." , self .gpio_pin
304- )
305-
306- def _setup_gpio (self ) -> bool :
307- """
308- Exports (if needed) and configures the GPIO pin.
309-
310- Returns:
311- True on success, False on failure.
312- """
313- # The export logic is now conditional on the node's existence.
314- if not self .gpio_node .exists ():
315- logging .info ("Exporting GPIO %s to system..." , self .gpio_pin )
316- try :
317- (GPIO_SYSFS_PATH / "export" ).write_text (str (self .gpio_pin ))
318- time .sleep (1 ) # Give sysfs time to create the node
319- except Exception as e :
320- logging .error ("Error exporting GPIO %s: %s" , self .gpio_pin , e )
321- return False
322-
323- if not self .gpio_node .is_dir ():
324- logging .error ("Unable to access GPIO %s." , self .gpio_pin )
325- return False
326-
327- try :
328- direction_file = self .gpio_node / "direction"
329- if direction_file .read_text (encoding = "utf-8" ).strip () != "in" :
330- direction_file .write_text ("in" )
331- logging .info ("Set GPIO %s direction to 'in'" , self .gpio_pin )
332- except Exception as e :
333- logging .error (
334- "Error setting direction for GPIO %s: %s" , self .gpio_pin , e
335- )
336- return False
337-
338- return True
339-
340- def run_test (self ) -> bool :
341- """
342- Prompts user to press/release button and checks GPIO state.
343-
344- This should be called inside the 'with' block after the
345- GPIO has been successfully set up.
346-
347- Returns:
348- True if press and release are detected, False otherwise.
349- """
350- try :
351- initial_value = (self .gpio_node / "value" ).read_text ().strip ()
352- logging .info (
353- "Initial value of %s is: %s" ,
354- self .gpio_node .name ,
355- "High" if initial_value == "1" else "Low" ,
356- )
357-
358- # Test Press
359- logging .info ("Please PRESS and HOLD the '%s' button..." , self .name )
360- for _ in range (TEST_TIMEOUT ):
361- val = (self .gpio_node / "value" ).read_text ().strip ()
362- if val != initial_value :
363- logging .info ("PASS: Button press detected!" )
364- break
365- time .sleep (1 )
366- else :
367- logging .error ("FAIL: No button press detected." )
368- return False
369-
370- # Test Release
371- logging .info ("Please RELEASE the '%s' button..." , self .name )
372- for _ in range (TEST_TIMEOUT ):
373- val = (self .gpio_node / "value" ).read_text ().strip ()
374- if val == initial_value :
375- logging .info ("PASS: Button release detected!" )
376- return True
377- time .sleep (1 )
378- else :
379- logging .error ("FAIL: No button release detected." )
380- return False
381-
382- except Exception as e :
383- logging .error ("An error occurred during GPIO test: %s" , e )
384- return False
385-
386-
387267def parse_arguments () -> argparse .Namespace :
388268 parser = argparse .ArgumentParser (
389269 description = (
390- "Monitor interrupts, verify IRQ CPU affinity, and "
391- "optionally test a GPIO button."
270+ "Monitor interrupts, verify IRQ CPU affinity "
271+ "for testing a interrupts button."
392272 ),
393273 formatter_class = argparse .RawTextHelpFormatter ,
394274 )
395- parser .add_argument (
396- "type" ,
397- choices = ["interrupts" , "gpio" ],
398- help = "The testing types in 'interrupts' and 'gpio' button." ,
399- )
400275 parser .add_argument (
401276 "--name" ,
402277 type = str ,
403278 required = True ,
404279 help = (
405280 "The name of the interrupt source (e.g., 'gpio-keys') "
406281 "to find in /proc/interrupts. "
407- "Or a GPIO button name"
408282 ),
409283 )
410- parser .add_argument (
411- "--gpio-pin" ,
412- type = str ,
413- help = "The GPIO pin number (required for 'gpio' type test)." ,
414- )
415284
416285 args = parser .parse_args ()
417- if args .type == "gpio" and args .gpio_pin is None :
418- parser .error ("--gpio-pin is required when type is 'gpio'" )
419-
420286 return args
421287
422288
@@ -434,14 +300,8 @@ def main():
434300
435301 args = parse_arguments ()
436302 test_result = None
437-
438- if args .type == "gpio" :
439- button_test = GpioTest (name = args .name , gpio_pin = args .gpio_pin )
440- with button_test as test :
441- test_result = test .run_test ()
442- elif args .type == "interrupts" :
443- button_test = InterruptsTest (irq_name = args .name )
444- test_result = button_test .run_test ()
303+ button_test = InterruptsTest (irq_name = args .name )
304+ test_result = button_test .run_test ()
445305
446306 if test_result :
447307 logging .info ("Button Test for '%s' PASSED!" , args .name )
0 commit comments