Skip to content

Commit 3a96ce3

Browse files
Autopilot Manager: create metadata_preprocessor.py
1 parent 9c81bff commit 3a96ce3

File tree

1 file changed

+198
-0
lines changed

1 file changed

+198
-0
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import gzip
2+
import io
3+
import json
4+
import sys
5+
from datetime import datetime
6+
from pathlib import Path
7+
from typing import Any, Dict
8+
9+
import requests
10+
from loguru import logger
11+
12+
13+
class ManifestHandler:
14+
"""
15+
Handles downloading and processing the ArduPilot firmware manifest to extract USB device information.
16+
"""
17+
18+
CACHE_MAX_AGE_DAYS = 10
19+
20+
def __init__(self) -> None:
21+
"""Initialize the manifest handler."""
22+
self._usb_devices: Dict[str, list[str]] = {}
23+
self.manifest_url = "https://firmware.ardupilot.org/manifest.json.gz"
24+
25+
def is_file_valid(self, file_path: Path) -> bool:
26+
"""
27+
Check if the file exists and is not too old.
28+
29+
Args:
30+
file_path: Path to the file to check.
31+
32+
Returns:
33+
bool: True if file exists and is valid, False otherwise.
34+
"""
35+
if not file_path.exists():
36+
return False
37+
38+
file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
39+
age = datetime.now() - file_time
40+
return age.days < self.CACHE_MAX_AGE_DAYS
41+
42+
def load_existing_data(self, file_path: Path) -> bool:
43+
"""
44+
Load USB devices data from an existing file if available and valid.
45+
46+
Args:
47+
file_path: Path to the file.
48+
49+
Returns:
50+
bool: True if file was loaded successfully, False otherwise.
51+
"""
52+
try:
53+
if self.is_file_valid(file_path):
54+
with open(file_path, "r", encoding="utf-8") as f:
55+
self._usb_devices = json.load(f)
56+
return True
57+
except (json.JSONDecodeError, OSError):
58+
pass
59+
return False
60+
61+
def _format_usb_id(self, usb_id: str) -> str:
62+
"""
63+
Convert USB ID to 'vid:pid' format.
64+
65+
Args:
66+
usb_id: USB ID string in '0xVID/0xPID' or 'VID:PID' format.
67+
68+
Returns:
69+
Formatted USB ID in 'vid:pid' format.
70+
"""
71+
if "/" in usb_id: # Handle '0xVID/0xPID' format
72+
vid, pid = usb_id.split("/")
73+
vid = vid.replace("0x", "").lower()
74+
pid = pid.replace("0x", "").lower()
75+
return f"{vid}:{pid}"
76+
return usb_id.lower() # Already in correct format
77+
78+
def download_manifest(self) -> Any:
79+
"""
80+
Download and decompress the manifest from the ArduPilot server.
81+
82+
Returns:
83+
The decompressed manifest as a dictionary.
84+
85+
Raises:
86+
RuntimeError: If there is an issue downloading or parsing the manifest.
87+
"""
88+
try:
89+
response = requests.get(self.manifest_url, timeout=10)
90+
response.raise_for_status()
91+
92+
with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as gz_file:
93+
return json.loads(gz_file.read().decode("utf-8"))
94+
except requests.RequestException as e:
95+
raise e
96+
except json.JSONDecodeError as e:
97+
raise e
98+
except Exception as e:
99+
raise e
100+
101+
def parse_manifest(self, manifest_data: dict[str, list[str]]) -> None:
102+
"""
103+
Extract USB device information from the manifest data.
104+
105+
Args:
106+
manifest_data: Raw manifest data as a dictionary.
107+
"""
108+
self._usb_devices.clear()
109+
110+
def process_node(node: Any) -> None:
111+
if isinstance(node, dict):
112+
if "USBID" in node and "platform" in node:
113+
usb_id = node["USBID"]
114+
platform = node["platform"]
115+
116+
# Handle both single USB ID and lists of USB IDs
117+
usb_ids = [usb_id] if isinstance(usb_id, str) else usb_id
118+
119+
for uid in usb_ids:
120+
formatted_uid = self._format_usb_id(uid)
121+
self._usb_devices.setdefault(formatted_uid, [])
122+
123+
if platform not in self._usb_devices[formatted_uid]:
124+
self._usb_devices[formatted_uid].append(platform)
125+
126+
# Recursively process all values in the dictionary
127+
for value in node.values():
128+
process_node(value)
129+
elif isinstance(node, list):
130+
# Recursively process all items in the list
131+
for item in node:
132+
process_node(item)
133+
134+
process_node(manifest_data)
135+
136+
def export_json(self, output_file: Path) -> None:
137+
"""
138+
Export USB device information to a JSON file.
139+
140+
Args:
141+
output_file: Path to the output JSON file.
142+
"""
143+
with open(output_file, "w", encoding="utf-8") as f:
144+
json.dump(self._usb_devices, f, indent=2)
145+
146+
def process_and_export(self, output_file: Path) -> None:
147+
"""
148+
Download manifest, process it, and export the USB device information to a file.
149+
If a valid file exists and is not too old, use it instead of downloading new data.
150+
151+
Args:
152+
output_file: Path to the output JSON file.
153+
154+
Raises:
155+
RuntimeError: If there is an issue during processing.
156+
"""
157+
# Try to load from existing file first
158+
if self.load_existing_data(output_file):
159+
logger.info(f"Using existing data from {output_file}")
160+
# Log summary of existing data
161+
device_count = len(self._usb_devices)
162+
platform_count = sum(len(platforms) for platforms in self._usb_devices.values())
163+
logger.info(f"Found {device_count} unique USB IDs")
164+
logger.info(f"Total platform mappings: {platform_count}")
165+
return
166+
167+
# If file is invalid or too old, download and process new data
168+
logger.info("Downloading manifest...")
169+
manifest_data = self.download_manifest()
170+
171+
logger.info("Processing manifest...")
172+
self.parse_manifest(manifest_data)
173+
self.export_json(output_file)
174+
175+
logger.success(f"Successfully exported USB device information to {output_file}")
176+
177+
# Log summary
178+
device_count = len(self._usb_devices)
179+
platform_count = sum(len(platforms) for platforms in self._usb_devices.values())
180+
logger.info(f"Found {device_count} unique USB IDs")
181+
logger.info(f"Total platform mappings: {platform_count}")
182+
183+
184+
def main() -> None:
185+
if len(sys.argv) != 2:
186+
logger.error("Usage: python metadata_preprocessor.py <output_file>")
187+
sys.exit(1)
188+
189+
try:
190+
handler = ManifestHandler()
191+
handler.process_and_export(Path(sys.argv[1]))
192+
except RuntimeError as e:
193+
logger.error(f"Error: {e}")
194+
sys.exit(1)
195+
196+
197+
if __name__ == "__main__":
198+
main()

0 commit comments

Comments
 (0)