55# (C) Ircama 2021 - CC-BY-NC-SA-4.0
66###########################################################################
77
8+ from typing import Any , Dict , List
9+
810# List of known ECUs:
911ECU_ADDR_J = "747"
1012ECU_R_ADDR_J = "74F"
@@ -1786,13 +1788,10 @@ def NA(neg_answer):
17861788 'Request' : '^01A6' + ELM_FOOTER ,
17871789 'Descr' : 'Total Distance Traveled' ,
17881790 'Header' : ECU_ADDR_E ,
1789- 'ResponseFooter' : lambda self , cmd , pid , uc_val : (
1790- HD (ECU_ADDR_H ) + SZ ('05' ) + DT ('61 28 00 EA 5C' )
1791- ),
17921791 'Response' : [
1793- HD (ECU_R_ADDR_H ) + SZ ('05 ' ) + DT ('61 28 00 EA 5C' ),
1794- HD (ECU_R_ADDR_H ) + SZ ('05 ' ) + DT ('61 27 00 EA 5C' ),
1795- HD (ECU_R_ADDR_H ) + SZ ('05 ' ) + DT ('61 28 00 EA 5D' ),
1792+ HD (ECU_R_ADDR_H ) + SZ ('06 ' ) + DT ('61 A6 00 00 EA 5C' ),
1793+ HD (ECU_R_ADDR_H ) + SZ ('06 ' ) + DT ('61 A6 00 00 EA 5C' ),
1794+ HD (ECU_R_ADDR_H ) + SZ ('06 ' ) + DT ('61 A6 00 00 EA 5D' ),
17961795 ]
17971796 },
17981797 # ------------------------------------------------------------
@@ -2084,11 +2083,7 @@ def NA(neg_answer):
20842083 'Request' : '^03' + ELM_FOOTER ,
20852084 'Descr' : 'Get DTCs (Diagnostic Trouble Codes)' ,
20862085 'Header' : ECU_ADDR_E ,
2087- 'Response' : [
2088- HD (ECU_R_ADDR_E ) + SZ ('02' ) + DT ('43 00' ),
2089- HD (ECU_R_ADDR_E ) + SZ ('02' ) + DT ('42 00' ),
2090- HD (ECU_R_ADDR_E ) + SZ ('02' ) + DT ('41 00' ),
2091- ]
2086+ 'Response' : HD (ECU_R_ADDR_E ) + SZ ('02' ) + DT ('43 00' )
20922087 },
20932088 # -------------------------------------------------------------------
20942089 # Mode 04 Clearing/resetting emission-related malfunction information
@@ -4445,16 +4440,16 @@ def NA(neg_answer):
44454440
44464441# Helper functions for dynamic PID support generation
44474442# see: https://en.wikipedia.org/wiki/OBD-II_PIDs#Service_01_PID_00
4448- def generate_pids_bitmap (supported_pids ) :
4443+ def generate_pids_bitmap (supported_pids : List [ int ]) -> str :
44494444 """
44504445 Generate a 4-byte hex bitmap representing which PIDs are supported.
4451-
4446+
44524447 Args:
44534448 supported_pids: List of PID numbers (as integers, e.g., [1, 3, 4, 5, ...])
4454-
4449+
44554450 Returns:
44564451 String with 4 bytes in hex format like "BE 1F A8 13"
4457-
4452+
44584453 Example:
44594454 For PIDs [1, 3, 4, 5, 6, 7, 12, 13, 14, 15, 16, 17, 19, 21, 28, 31, 32]:
44604455 Returns "BE 1F A8 13"
@@ -4470,53 +4465,59 @@ def generate_pids_bitmap(supported_pids):
44704465 return ' ' .join (f"{ byte :02X} " for byte in bitmap )
44714466
44724467
4473- def extract_supported_pids (scenario_dict , pids_supported , service = "01" , start_pid = 0x01 , end_pid = 0x20 ):
4468+ def extract_supported_pids (
4469+ scenario_dict : Dict [str , Dict [str , Any ]],
4470+ pids_supported ,
4471+ service : str = "01" ,
4472+ start_pid : int = 0x01 ,
4473+ end_pid : int = 0x20 ,
4474+ ) -> List [int ]:
44744475 """
44754476 Extract all supported PIDs from a scenario dictionary for a given service.
4476-
4477+
44774478 Args:
44784479 scenario_dict: Dictionary containing PID definitions
44794480 service: Service ID as string (e.g., '01' for Mode 01)
44804481 start_pid: Starting PID number (inclusive)
44814482 end_pid: Ending PID number (inclusive)
4482-
4483+
44834484 Returns:
44844485 List of supported PID numbers (as integers)
44854486 """
44864487 import re
44874488
4488- supported = []
4489+ supported : List [ int ] = []
44894490
44904491 for _pid_name , pid_info in scenario_dict .items ():
44914492 if "Request" not in pid_info :
44924493 continue
44934494
44944495 pattern = rf"\^{ service } ([0-9A-F]{{2}})"
44954496 match = re .search (pattern , pid_info ["Request" ])
4496-
4497+
44974498 if match :
44984499 pid_hex = match .group (1 )
44994500 pid_num = int (pid_hex , 16 )
4500-
4501+
45014502 # Check if PID is in the desired range and not a PIDS_X query itself
45024503 if start_pid <= pid_num <= end_pid and pid_hex not in pids_supported :
45034504 supported .append (pid_num )
4504-
4505+
45054506 return sorted (supported )
45064507
45074508
4508- def generate_dynamic_pids_entries (scenario_dict , service = "01" ):
4509+ def generate_dynamic_pids_entries (scenario_dict : Dict [ str , Dict [ str , Any ]], service : str = "01" ):
45094510 """
45104511 Generate dynamic ELM_PIDS_X entries based on supported PIDs in the scenario.
4511-
4512+
45124513 Args:
45134514 scenario_dict: Dictionary containing PID definitions
45144515 service: Service ID as string (e.g., '01' for Mode 01)
4515-
4516+
45164517 Returns:
45174518 Dictionary with ELM_PIDS_X entries to be merged into the scenario
45184519 """
4519- pids_entries = {}
4520+ pids_entries : Dict [ str , Dict [ str , Any ]] = {}
45204521
45214522 pid_ranges = [
45224523 ('A' , 0x00 , 0x01 , 0x20 ),
@@ -4530,24 +4531,34 @@ def generate_dynamic_pids_entries(scenario_dict, service="01"):
45304531
45314532 pids_supported = [f"{ p [3 ]:02X} " for p in pid_ranges ]
45324533
4534+ # This is the highest PID we know of based on the defined ranges
4535+ last_known_pid = max (end for _ , _ , _ , end in pid_ranges )
4536+
4537+ # Extract all supported PIDs across the entire range to determine the last supported PID
4538+ all_supported_pids = extract_supported_pids (scenario_dict , pids_supported , service , 0x01 , last_known_pid )
4539+
4540+ # Determine the last supported PID to then add intermediate "Show PIDs supported" entries if
4541+ # there are gaps
4542+ last_supported_pid = all_supported_pids [- 1 ] if all_supported_pids else 0
4543+
45334544 for suffix , request_pid , start_pid , end_pid in pid_ranges :
4534- supported = extract_supported_pids (scenario_dict , pids_supported ,service , start_pid , end_pid )
4545+ # Get the supported PIDs for this specific range
4546+ supported : List [int ] = [
4547+ value for value in all_supported_pids if start_pid <= value <= end_pid
4548+ ]
45354549
4536- if not supported and request_pid > 0x00 :
4550+ if not supported and request_pid > 0x00 and last_supported_pid < start_pid :
45374551 continue
45384552
4539- next_range_has_pids = False
4540- if end_pid < 0xE0 :
4541- next_supported = extract_supported_pids (scenario_dict , pids_supported , service , end_pid , end_pid + 0x20 )
4542- next_range_has_pids = len (next_supported ) > 0
4553+ next_ranges_have_pids = last_supported_pid > end_pid
45434554
45444555 pids_to_encode = supported .copy ()
4545- if next_range_has_pids :
4556+ if next_ranges_have_pids :
45464557 pids_to_encode .append (end_pid - start_pid + 1 )
4547-
4558+
45484559 relative_pids = [(pid - start_pid + 1 ) for pid in supported if start_pid <= pid <= end_pid ]
4549-
4550- if next_range_has_pids :
4560+
4561+ if next_ranges_have_pids :
45514562 relative_pids .append (32 )
45524563
45534564 bitmap = generate_pids_bitmap (relative_pids )
@@ -4558,19 +4569,19 @@ def generate_dynamic_pids_entries(scenario_dict, service="01"):
45584569 "Descr" : f"Supported PIDS_{ suffix } [{ start_pid :02X} -{ end_pid :02X} ]" ,
45594570 "Response" : HD (ECU_R_ADDR_E ) + SZ ("06" ) + DT (f"41 { request_pid :02X} { bitmap } " )
45604571 }
4561-
4572+
45624573 # Special handling for PIDS_A to simulate "SEARCHING..."
45634574 if suffix == 'A' :
45644575 pids_entries [entry_name ]["ResponseHeader" ] = \
45654576 lambda self , cmd , pid , uc_val : \
45664577 "<string>SEARCHING...</string>" \
45674578 "<exec>time.sleep(1.5)</exec>" + ST ('' ) \
45684579 if self .counters [pid ] == 1 else ''
4569-
4580+
45704581 return pids_entries
45714582
45724583
4573- def update_scenario_with_dynamic_pids (scenario_dict , service = "01" ):
4584+ def update_scenario_with_dynamic_pids (scenario_dict : Dict [ str , Dict [ str , Any ]], service : str = "01" ):
45744585 """
45754586 Update a scenario dictionary with dynamically generated ELM_PIDS_X entries.
45764587 This will replace existing static PIDS_ & ELM_PIDS_ entries with dynamic ones.
@@ -4583,7 +4594,7 @@ def update_scenario_with_dynamic_pids(scenario_dict, service="01"):
45834594 pids_keys_to_remove = [k for k in scenario_dict .keys () if k .startswith (("ELM_PIDS_" , "PIDS_" ))]
45844595 for key in pids_keys_to_remove :
45854596 del scenario_dict [key ]
4586-
4597+
45874598 # add new dynamic
45884599 dynamic_pids = generate_dynamic_pids_entries (scenario_dict , service )
45894600 scenario_dict .update (dynamic_pids )
0 commit comments