Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 60 additions & 55 deletions python/can/command_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# Cache directory for downloaded signal definitions
CACHE_DIR = Path(__file__).parent / ".cache"
SAEJ1979_URL = "https://raw.githubusercontent.com/OBDb/SAEJ1979/refs/heads/main/signalsets/v3/default.json"
NISSANINFINITI_URL = "https://raw.githubusercontent.com/OBDb/NissanInfiniti/refs/heads/main/signalsets/v3/default.json"

# Global registry cache by model year
MODEL_YEAR_REGISTRY_CACHE: Dict[int, 'CommandRegistry'] = {}
Expand All @@ -28,45 +29,71 @@ def _make_signalset_generic(json_data: str, header_override: str) -> str:
command['hdr'] = header_override
return json.dumps(data)

def _load_sae_commands(json_data: str) -> List['Command']:
def _load_standard_commands(json_data: str) -> List['Command']:
return (
SignalSet.from_json(_make_signalset_generic(json_data, header_override='7E0')).commands
.union(SignalSet.from_json(_make_signalset_generic(json_data, header_override='DB33')).commands)
.union(SignalSet.from_json(_make_signalset_generic(json_data, header_override='686A')).commands)
)

def get_cached_saej1979_signals() -> List['Command']:
"""Fetch and cache the SAEJ1979 signal definitions."""
def get_cached_signalset(url: str, cache_name: str, signal_type: str = "generic") -> List['Command']:
"""Generic function to fetch and cache signal definitions.

Args:
url: The URL to fetch the signalset from
cache_name: The name for the cache file (without .json extension)
signal_type: The type of signals ("standard" for standard commands, "generic" for others)

Returns:
List of Command objects from the signalset
"""
os.makedirs(CACHE_DIR, exist_ok=True)
cache_file = CACHE_DIR / "saej1979_signals.json"
cache_file = CACHE_DIR / f"{cache_name}.json"

try:
# If cache exists and is less than 24 hours old, use it
if cache_file.exists() and (time.time() - cache_file.stat().st_mtime) < 86400:
with open(cache_file) as f:
json_data = f.read()
return _load_sae_commands(json_data)
if signal_type == "standard":
return _load_standard_commands(json_data)
else:
return list(SignalSet.from_json(json_data).commands)
except Exception as e:
print(f"Warning: Error reading cache: {e}")
print(f"Warning: Error reading {cache_name} cache: {e}")

try:
# Fetch fresh data
with urllib.request.urlopen(SAEJ1979_URL) as response:
with urllib.request.urlopen(url) as response:
json_data = response.read().decode('utf-8')
# Cache the cleaned data
os.makedirs(CACHE_DIR, exist_ok=True)
# Cache the raw data
with open(cache_file, 'w') as f:
f.write(json_data)
return _load_sae_commands(json_data)

if signal_type == "standard":
return _load_standard_commands(json_data)
else:
return list(SignalSet.from_json(json_data).commands)
except Exception as e:
print(f"Warning: Could not fetch SAEJ1979 signals: {e}")
print(f"Warning: Could not fetch {cache_name} signals: {e}")
# If we have a cache file, use it even if it's old
if cache_file.exists():
with open(cache_file) as f:
json_data = f.read()
return _load_sae_commands(json_data)
if signal_type == "standard":
return _load_standard_commands(json_data)
else:
return list(SignalSet.from_json(json_data).commands)
return []

def get_cached_saej1979_signals() -> List['Command']:
"""Fetch and cache the SAEJ1979 signal definitions."""
return get_cached_signalset(SAEJ1979_URL, "saej1979_signals", "standard")

def get_cached_nissan_infiniti_signals() -> List['Command']:
"""Fetch and cache the Nissan/Infiniti signal definitions."""
return get_cached_signalset(NISSANINFINITI_URL, "nissan_infiniti_signals", "standard")

class ServiceType(Enum):
SERVICE_01 = 0x01
SERVICE_21 = 0x21
Expand Down Expand Up @@ -323,55 +350,33 @@ def get_model_year_command_registry(model_year: int) -> 'CommandRegistry':
signalset_json = get_signalset_from_model_year(model_year)
signalset = SignalSet.from_json(signalset_json)

# Get SAE J1979 base signals
# Get SAE J1979 base signals (always first)
saej1979_commands = get_cached_saej1979_signals()

# Check if the model-specific signalset is empty and we need to use the make repo
if not signalset.commands:
# Extract the make from the repo name using our utility function
make = extract_make_from_repo_name()
# Initialize the list with SAE J1979 commands
combined_commands = list(saej1979_commands)

if make:
# Try to fetch the make's signalset
make_url = f"https://raw.githubusercontent.com/OBDb/{make}/refs/heads/main/signalsets/v3/default.json"
# Extract the make from the repo name using our utility function
make = extract_make_from_repo_name()

try:
import urllib.request
import time
import json

# Create cache dir if not exists
os.makedirs(CACHE_DIR, exist_ok=True)
cache_file = CACHE_DIR / f"{make.lower()}_signals.json"

try:
# If cache exists and is less than 24 hours old, use it
if cache_file.exists() and (time.time() - cache_file.stat().st_mtime) < 86400:
with open(cache_file) as f:
json_data = f.read()
make_signalset = SignalSet.from_json(json_data)
print(f"Using cached {make} signals for model year {model_year}")
signalset = make_signalset
except Exception as e:
print(f"Warning: Error reading {make} cache: {e}")

# If we don't have valid cached data, fetch it
if not signalset.commands:
with urllib.request.urlopen(make_url) as response:
json_data = response.read().decode('utf-8')

with open(cache_file, 'w') as f:
f.write(json_data)

make_signalset = SignalSet.from_json(json_data)
signalset = make_signalset
print(f"Fetched and using {make} signals for model year {model_year}")
# Check if the make is Nissan or Infiniti and add those commands
if make and isinstance(make, str) and make.lower() in ['nissan', 'infiniti']:
nissan_infiniti_commands = get_cached_nissan_infiniti_signals()
combined_commands.extend(nissan_infiniti_commands)
print(f"Added Nissan/Infiniti signals for {make} vehicle")

except Exception as e:
print(f"Warning: Could not fetch {make} signals: {e}")
# Check if the model-specific signalset is empty and we need to use the make repo
if not signalset.commands:
if make and isinstance(make, str):
# Try to fetch the make's signalset as a fallback
make_url = f"https://raw.githubusercontent.com/OBDb/{make}/refs/heads/main/signalsets/v3/default.json"
make_commands = get_cached_signalset(make_url, f"{make.lower()}_signals")
if make_commands:
signalset = SignalSet(commands=set(make_commands))
print(f"Using {make} signals for model year {model_year}")

# Combine signals from SAE J1979 and model-specific or make-specific signals
combined_commands = list(saej1979_commands) + list(signalset.commands)
# Add model-specific or make-specific signals to the combined commands
combined_commands.extend(list(signalset.commands))

# Filter combined_commands by each command's optional .filter property.
if model_year is not None:
Expand Down