|
| 1 | +import json |
| 2 | +import os |
| 3 | +import time |
| 4 | +import subprocess |
| 5 | +import urllib.request |
| 6 | +import urllib.error |
| 7 | +import shutil |
| 8 | +import glob as glob_module |
| 9 | + |
| 10 | +from flask import jsonify |
| 11 | + |
| 12 | + |
| 13 | +class FirmwareManager: |
| 14 | + """Handles Nucleo firmware check, download, and flash operations.""" |
| 15 | + |
| 16 | + FIRMWARE_REPO = 'ECC-BFMC/Embedded_Platform' |
| 17 | + FIRMWARE_FILE_PATH = 'cmake_build/NUCLEO_F401RE/develop/GCC_ARM/robot_car.bin' |
| 18 | + FIRMWARE_API_URL = f'https://api.github.com/repos/{FIRMWARE_REPO}/commits' |
| 19 | + FIRMWARE_RAW_URL = f'https://raw.githubusercontent.com/{FIRMWARE_REPO}/master/{FIRMWARE_FILE_PATH}' |
| 20 | + NUCLEO_MOUNT_PATTERN = '/media/pi/NOD_F401RE*' |
| 21 | + |
| 22 | + def __init__(self, repo_path): |
| 23 | + self.repo_path = repo_path |
| 24 | + |
| 25 | + def _get_firmware_dir(self): |
| 26 | + return os.path.join(self.repo_path, 'src', 'hardware', 'firmware') |
| 27 | + |
| 28 | + def _get_local_info(self): |
| 29 | + """Read locally stored firmware version metadata.""" |
| 30 | + info_path = os.path.join(self._get_firmware_dir(), 'firmware_version.json') |
| 31 | + if os.path.exists(info_path): |
| 32 | + with open(info_path, 'r') as f: |
| 33 | + return json.load(f) |
| 34 | + return None |
| 35 | + |
| 36 | + def _save_local_info(self, info): |
| 37 | + """Save firmware version metadata to disk.""" |
| 38 | + fw_dir = self._get_firmware_dir() |
| 39 | + os.makedirs(fw_dir, exist_ok=True) |
| 40 | + with open(os.path.join(fw_dir, 'firmware_version.json'), 'w') as f: |
| 41 | + json.dump(info, f, indent=2) |
| 42 | + |
| 43 | + def _find_nucleo_mount(self): |
| 44 | + """Find the Nucleo board's mass storage mount point.""" |
| 45 | + matches = glob_module.glob(self.NUCLEO_MOUNT_PATTERN) |
| 46 | + for path in matches: |
| 47 | + if os.path.ismount(path): |
| 48 | + return path |
| 49 | + return None |
| 50 | + |
| 51 | + def handle_check(self): |
| 52 | + """Check if a newer firmware binary is available on the Embedded_Platform repo.""" |
| 53 | + try: |
| 54 | + url = f'{self.FIRMWARE_API_URL}?path={self.FIRMWARE_FILE_PATH}&per_page=1' |
| 55 | + req = urllib.request.Request(url, headers={'User-Agent': 'BFMC-Brain'}) |
| 56 | + with urllib.request.urlopen(req, timeout=15) as resp: |
| 57 | + commits = json.loads(resp.read().decode()) |
| 58 | + |
| 59 | + if not commits: |
| 60 | + return jsonify({'success': False, 'error': 'No commits found for firmware file'}), 500 |
| 61 | + |
| 62 | + remote_sha = commits[0]['sha'] |
| 63 | + remote_date = commits[0]['commit']['committer']['date'] |
| 64 | + remote_message = commits[0]['commit']['message'].split('\n')[0] |
| 65 | + |
| 66 | + local_info = self._get_local_info() |
| 67 | + local_sha = local_info.get('commit_sha', '') if local_info else '' |
| 68 | + |
| 69 | + fw_path = os.path.join(self._get_firmware_dir(), 'robot_car.bin') |
| 70 | + has_local_file = os.path.exists(fw_path) |
| 71 | + |
| 72 | + update_available = (local_sha != remote_sha) or not has_local_file |
| 73 | + |
| 74 | + return jsonify({ |
| 75 | + 'success': True, |
| 76 | + 'update_available': update_available, |
| 77 | + 'has_local_file': has_local_file, |
| 78 | + 'remote_sha': remote_sha[:7], |
| 79 | + 'remote_date': remote_date, |
| 80 | + 'remote_message': remote_message, |
| 81 | + 'local_sha': local_sha[:7] if local_sha else '', |
| 82 | + 'local_date': local_info.get('downloaded_at', '') if local_info else '' |
| 83 | + }) |
| 84 | + except urllib.error.URLError as e: |
| 85 | + return jsonify({'success': False, 'error': f'Failed to reach GitHub: {e.reason}'}), 500 |
| 86 | + except Exception as e: |
| 87 | + return jsonify({'success': False, 'error': str(e)}), 500 |
| 88 | + |
| 89 | + def handle_download(self): |
| 90 | + """Download the latest robot_car.bin from the Embedded_Platform repo.""" |
| 91 | + try: |
| 92 | + url = f'{self.FIRMWARE_API_URL}?path={self.FIRMWARE_FILE_PATH}&per_page=1' |
| 93 | + req = urllib.request.Request(url, headers={'User-Agent': 'BFMC-Brain'}) |
| 94 | + with urllib.request.urlopen(req, timeout=15) as resp: |
| 95 | + commits = json.loads(resp.read().decode()) |
| 96 | + |
| 97 | + if not commits: |
| 98 | + return jsonify({'success': False, 'error': 'No commits found for firmware file'}), 500 |
| 99 | + |
| 100 | + remote_sha = commits[0]['sha'] |
| 101 | + |
| 102 | + dl_req = urllib.request.Request(self.FIRMWARE_RAW_URL, headers={'User-Agent': 'BFMC-Brain'}) |
| 103 | + with urllib.request.urlopen(dl_req, timeout=30) as resp: |
| 104 | + firmware_data = resp.read() |
| 105 | + |
| 106 | + fw_dir = self._get_firmware_dir() |
| 107 | + os.makedirs(fw_dir, exist_ok=True) |
| 108 | + fw_path = os.path.join(fw_dir, 'robot_car.bin') |
| 109 | + |
| 110 | + with open(fw_path, 'wb') as f: |
| 111 | + f.write(firmware_data) |
| 112 | + |
| 113 | + self._save_local_info({ |
| 114 | + 'commit_sha': remote_sha, |
| 115 | + 'downloaded_at': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), |
| 116 | + 'file_size': len(firmware_data) |
| 117 | + }) |
| 118 | + |
| 119 | + return jsonify({ |
| 120 | + 'success': True, |
| 121 | + 'message': f'Firmware downloaded successfully ({len(firmware_data)} bytes). File saved to src/hardware/firmware/robot_car.bin' |
| 122 | + }) |
| 123 | + except urllib.error.URLError as e: |
| 124 | + return jsonify({'success': False, 'error': f'Failed to download firmware: {e.reason}'}), 500 |
| 125 | + except Exception as e: |
| 126 | + return jsonify({'success': False, 'error': str(e)}), 500 |
| 127 | + |
| 128 | + def handle_flash(self): |
| 129 | + """Flash robot_car.bin to the Nucleo board via its USB mass storage.""" |
| 130 | + try: |
| 131 | + fw_path = os.path.join(self._get_firmware_dir(), 'robot_car.bin') |
| 132 | + if not os.path.exists(fw_path): |
| 133 | + return jsonify({ |
| 134 | + 'success': False, |
| 135 | + 'error': 'Firmware file not found. Download it first.' |
| 136 | + }), 400 |
| 137 | + |
| 138 | + fw_size = os.path.getsize(fw_path) |
| 139 | + if fw_size == 0: |
| 140 | + return jsonify({ |
| 141 | + 'success': False, |
| 142 | + 'error': 'Firmware file is empty. Try downloading again.' |
| 143 | + }), 400 |
| 144 | + |
| 145 | + nucleo_mount = self._find_nucleo_mount() |
| 146 | + if not nucleo_mount: |
| 147 | + return jsonify({ |
| 148 | + 'success': False, |
| 149 | + 'error': 'Nucleo board not detected. Make sure it is connected via USB and mounted.' |
| 150 | + }), 404 |
| 151 | + |
| 152 | + if not os.access(nucleo_mount, os.W_OK): |
| 153 | + return jsonify({ |
| 154 | + 'success': False, |
| 155 | + 'error': f'Cannot write to Nucleo mount point ({nucleo_mount}). Check permissions.' |
| 156 | + }), 403 |
| 157 | + |
| 158 | + dest_path = os.path.join(nucleo_mount, 'robot_car.bin') |
| 159 | + shutil.copy2(fw_path, dest_path) |
| 160 | + |
| 161 | + subprocess.run(['sync'], timeout=10) |
| 162 | + |
| 163 | + return jsonify({ |
| 164 | + 'success': True, |
| 165 | + 'message': f'Firmware flashed successfully to {nucleo_mount}. The Nucleo will reset automatically.' |
| 166 | + }) |
| 167 | + except PermissionError: |
| 168 | + return jsonify({'success': False, 'error': 'Permission denied writing to Nucleo. Try running with sudo.'}), 403 |
| 169 | + except OSError as e: |
| 170 | + return jsonify({'success': False, 'error': f'Failed to flash firmware: {e}'}), 500 |
| 171 | + except Exception as e: |
| 172 | + return jsonify({'success': False, 'error': str(e)}), 500 |
0 commit comments