diff --git a/.gitignore b/.gitignore index 7f7adc5..eb72520 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,9 @@ /build -/.vscode \ No newline at end of file +/.vscode + +# Python WebSocket client files +*.csv +vex_telemetry_*.csv +__pycache__/ +*.pyc +*.pyo \ No newline at end of file diff --git a/WEBSOCKET_README.md b/WEBSOCKET_README.md new file mode 100644 index 0000000..c28d6eb --- /dev/null +++ b/WEBSOCKET_README.md @@ -0,0 +1,293 @@ +# VEX WebSocket Client Documentation + +This document describes the Python WebSocket client implementation for receiving telemetry data from VEX robots via the VEX Extension's WebSocket server. + +## Overview + +The WebSocket client connects to the VEX Extension server running on port 7071 and receives real-time telemetry data from the robot, including: + +- Motor temperatures +- Motor RPM (commanded and actual) +- Motor position and current +- Battery voltage +- Inertial sensor data (gyro axes) +- System logs and warnings + +## Installation + +### Prerequisites + +- Python 3.7 or higher +- VEX Extension installed in VS Code +- USB connection to VEX Brain or Controller + +### Dependencies + +Install the required Python packages: + +```bash +pip3 install -r requirements.txt +``` + +Or manually: + +```bash +pip3 install websocket-client +``` + +## Usage + +### Basic Usage + +```bash +# Connect to default localhost:7071 +python3 websocket_client.py + +# Connect to specific host/port +python3 websocket_client.py --host 192.168.1.100 --port 7071 + +# Save data to CSV file +python3 websocket_client.py --save-to-file + +# Enable verbose logging +python3 websocket_client.py --verbose +``` + +### Command Line Options + +- `--host HOST`: WebSocket server host (default: localhost) +- `--port PORT`: WebSocket server port (default: 7071) +- `--save-to-file`: Save received data to timestamped CSV file +- `--verbose`: Enable verbose logging + +## Data Format + +### Raw WebSocket Messages + +The VEX Extension forwards data from the robot's printf() statements. Example messages: + +``` +[Info] > Time: 15.234 > Module: motorMonitor > + | LeftTemp: 45° + | RightTemp: 48° + | RearLeftTemp: 42° + | RearRightTemp: 46° + | Battery Voltage: 12.5V + +[Info] > Time: 15.250 > Module: motorMonitor > +X Axis: 2.5 +Y Axis: -1.2 +Z Axis: 180.0 + +[Info] > Time: 15.265 > Module: motorMonitor > + | FLM: CmdRPM: 150 | ActRPM: 145 | Pos: 2.75rev | Current: 1.2A +``` + +### Parsed Data Structure + +The client parses raw messages into structured data: + +```python +{ + 'timestamp': '2024-01-01T12:00:00.123456', + 'raw_message': '[Info] > Time: 15.234 > Module: motorMonitor > ...', + 'module': 'motorMonitor', + 'log_level': 'Info', + 'time_sec': 15.234, + 'motor_temps': { + 'left': 45, + 'right': 48, + 'rearleft': 42, + 'rearright': 46 + }, + 'battery_voltage': 12.5, + 'gyro_axes': { + 'x': 2.5, + 'y': -1.2, + 'z': 180.0 + }, + 'motor_details': { + 'flm': { + 'commanded_rpm': 150, + 'actual_rpm': 145, + 'position_rev': 2.75, + 'current_amps': 1.2 + } + } +} +``` + +## CSV Output Format + +When `--save-to-file` is enabled, data is saved to a timestamped CSV file with columns: + +- `timestamp`: ISO format timestamp +- `module`: Source module (e.g., motorMonitor) +- `log_level`: Log level (Info, Warn, Error, etc.) +- `time_sec`: Robot time in seconds +- `left_temp`, `right_temp`, `rear_left_temp`, `rear_right_temp`: Motor temperatures +- `battery_voltage`: Battery voltage in volts +- `x_axis`, `y_axis`, `z_axis`: Gyro axes in degrees +- `motor_name`: Motor identifier for detailed data +- `commanded_rpm`: Commanded motor RPM +- `actual_rpm`: Actual motor RPM +- `position_rev`: Motor position in revolutions +- `current_amps`: Motor current in amperes +- `raw_message`: Original WebSocket message + +## VEX Robot Code Changes + +The C++ robot code has been enhanced to output more detailed telemetry data. The `motorMonitor()` function now includes: + +### Enhanced Motor Data Output + +```cpp +// Log detailed motor data including RPM, current, and position +std::array motors = {&frontLeftMotor, &frontRightMotor, &rearLeftMotor, &rearRightMotor}; +for (size_t i = 0; i < motors.size(); ++i) +{ + if (motors[i]->installed()) + { + int commandedRpm = vexMotorVelocityGet(motors[i]->index()); + int actualRpm = motors[i]->velocity(vex::velocityUnits::rpm); + double position = motors[i]->position(vex::rotationUnits::rev); + double current = motors[i]->current(); + + std::string motorDetailStr = std::format( + "\n | {}: CmdRPM: {} | ActRPM: {} | Pos: {:.2f}rev | Current: {:.2f}A", + motorNames[i], commandedRpm, actualRpm, position, current); + logHandler("motorMonitor", motorDetailStr, Log::Level::Info); + } +} +``` + +### Data Types Available + +- **Motor Temperature**: Temperature in Celsius +- **Commanded RPM**: Target motor speed from VEX C API +- **Actual RPM**: Real motor speed from encoders +- **Position**: Motor encoder position in revolutions +- **Current**: Motor current draw in amperes +- **Battery Voltage**: Brain battery voltage +- **Inertial Data**: Gyro pitch, roll, yaw in degrees + +## Testing + +Run the test script to validate parsing functionality: + +```bash +python3 test_websocket_client.py +``` + +This tests the parser with simulated VEX robot messages without requiring a live connection. + +## Error Handling + +The client includes robust error handling: + +- **Connection failures**: Automatic reconnection attempts every 5 seconds +- **Parse errors**: Logged but don't interrupt the connection +- **File I/O errors**: Graceful fallback when CSV writing fails +- **Keyboard interrupt**: Clean shutdown with statistics display + +## Example Output + +``` +VEX WebSocket Client for Motor Monitoring +================================================== +Connecting to: ws://localhost:7071/vexrobotics.vexcode/device +Save to file: Yes +================================================== +Press Ctrl+C to stop + +Connected to VEX Extension WebSocket server at ws://localhost:7071 +Waiting for telemetry data... + +================================================== +MOTOR TELEMETRY - Time: 15.234s +================================================== +Motor Temperatures: + LEFT: 45°C ✅ OK + RIGHT: 48°C ✅ OK + REARLEFT: 42°C ✅ OK + REARRIGHT: 46°C ✅ OK +Battery: 12.5V ✅ OK +Gyro Axes: + X: 2.50° + Y: -1.20° + Z: 180.00° +Motor Details: + FLM: + RPM: 150 cmd / 145 actual + Position: 2.75 rev + Current: 1.20A +================================================== +``` + +## Troubleshooting + +### Connection Issues + +1. Ensure VEX Extension is installed and running in VS Code +2. Verify robot is connected via USB +3. Check that robot is selected in VEX Extension +4. Confirm port 7071 is not blocked by firewall + +### No Data Received + +1. Verify robot code is running and using `printf()` or `logHandler()` +2. Check that robot is in enabled mode (competition or test) +3. Ensure motor monitoring is active + +### Parse Errors + +The client logs parse errors but continues operation. Check: +1. Message format matches expected patterns +2. Robot code outputs data in expected format +3. No special characters interfere with parsing + +## Integration Examples + +### Real-time Dashboard + +```python +from websocket_client import VEXWebSocketClient +import matplotlib.pyplot as plt +from collections import deque + +class RealTimeDashboard: + def __init__(self): + self.temps = deque(maxlen=100) + self.times = deque(maxlen=100) + + def update_plot(self, data): + if data['motor_temps']: + avg_temp = sum(data['motor_temps'].values()) / len(data['motor_temps']) + self.temps.append(avg_temp) + self.times.append(data.get('time_sec', 0)) + + plt.plot(self.times, self.temps) + plt.pause(0.01) +``` + +### Data Analysis + +```python +import pandas as pd + +# Load CSV data +df = pd.read_csv('vex_telemetry_20240101_120000.csv') + +# Analyze motor performance +motor_data = df[df['motor_name'].notna()] +print(motor_data.groupby('motor_name')[['actual_rpm', 'current_amps']].mean()) + +# Plot temperature trends +import matplotlib.pyplot as plt +df.plot(x='time_sec', y=['left_temp', 'right_temp'], kind='line') +plt.show() +``` + +## License + +This code is provided as-is for educational and development purposes with VEX robotics systems. \ No newline at end of file diff --git a/example_usage.py b/example_usage.py new file mode 100755 index 0000000..edbbb3b --- /dev/null +++ b/example_usage.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +""" +Example usage of the VEX WebSocket Client + +This script demonstrates how to use the WebSocket client to monitor +VEX robot telemetry data and perform basic analysis. +""" + +import time +import json +from websocket_client import VEXWebSocketClient + +class MotorAnalyzer: + """Example analyzer for motor telemetry data.""" + + def __init__(self): + self.temperature_alerts = [] + self.performance_data = [] + + def analyze_motor_data(self, data): + """Analyze incoming motor data and generate alerts.""" + + # Temperature monitoring + if data['motor_temps']: + for motor, temp in data['motor_temps'].items(): + if temp >= 55: + alert = { + 'timestamp': data['timestamp'], + 'motor': motor, + 'temperature': temp, + 'severity': 'high' if temp >= 60 else 'medium' + } + self.temperature_alerts.append(alert) + print(f"🔥 TEMPERATURE ALERT: {motor.upper()} at {temp}°C") + + # Performance monitoring + if data['motor_details']: + for motor, details in data['motor_details'].items(): + rpm_efficiency = details['actual_rpm'] / max(details['commanded_rpm'], 1) * 100 + performance = { + 'timestamp': data['timestamp'], + 'motor': motor, + 'efficiency': rpm_efficiency, + 'current': details['current_amps'] + } + self.performance_data.append(performance) + + if rpm_efficiency < 80: + print(f"⚠️ PERFORMANCE: {motor.upper()} efficiency at {rpm_efficiency:.1f}%") + + # Battery monitoring + if data.get('battery_voltage') and data['battery_voltage'] < 11.5: + print(f"🔋 BATTERY LOW: {data['battery_voltage']}V") + + def get_summary(self): + """Get analysis summary.""" + return { + 'temperature_alerts': len(self.temperature_alerts), + 'recent_alerts': self.temperature_alerts[-5:] if self.temperature_alerts else [], + 'avg_efficiency': sum(p['efficiency'] for p in self.performance_data[-10:]) / min(len(self.performance_data), 10) if self.performance_data else 0 + } + +class CustomWebSocketClient(VEXWebSocketClient): + """Extended WebSocket client with custom analysis.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.analyzer = MotorAnalyzer() + self.message_count = 0 + + def on_message(self, ws, message): + """Override to add custom analysis.""" + # Call parent method for standard processing + super().on_message(ws, message) + + # Perform custom analysis + self.message_count += 1 + if self.data_log: + latest_data = self.data_log[-1] + self.analyzer.analyze_motor_data(latest_data) + + # Print summary every 20 messages + if self.message_count % 20 == 0: + summary = self.analyzer.get_summary() + print(f"\n📊 ANALYSIS SUMMARY (after {self.message_count} messages):") + print(f" Temperature alerts: {summary['temperature_alerts']}") + print(f" Average efficiency: {summary['avg_efficiency']:.1f}%") + if summary['recent_alerts']: + print(f" Recent alerts: {len(summary['recent_alerts'])}") + print() + +def main(): + """Example main function.""" + print("VEX WebSocket Client - Advanced Example") + print("=====================================") + print("This example demonstrates:") + print("- Real-time temperature monitoring") + print("- Motor performance analysis") + print("- Battery voltage tracking") + print("- Custom alerts and summaries") + print("\nPress Ctrl+C to stop\n") + + # Create enhanced client + client = CustomWebSocketClient( + host="localhost", + port=7071, + save_to_file=True # Save data for later analysis + ) + + try: + client.connect() + except KeyboardInterrupt: + print("\n\n🛑 Stopping...") + + if client.data_log: + # Final analysis + summary = client.analyzer.get_summary() + stats = client.get_statistics() + + print(f"\n📊 FINAL ANALYSIS:") + print(f" Total messages received: {len(client.data_log)}") + print(f" Temperature alerts: {summary['temperature_alerts']}") + print(f" Average motor efficiency: {summary['avg_efficiency']:.1f}%") + + if stats.get('battery_readings'): + avg_battery = sum(stats['battery_readings']) / len(stats['battery_readings']) + print(f" Average battery voltage: {avg_battery:.2f}V") + + print(f"\n💾 Data saved to CSV file for further analysis") + print(" Use pandas or Excel to analyze the saved data") + else: + print("No data received. Check VEX Extension connection.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d53b3a4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +websocket-client>=1.8.0 \ No newline at end of file diff --git a/src/functions.cpp b/src/functions.cpp index 9705b77..ef1d67c 100644 --- a/src/functions.cpp +++ b/src/functions.cpp @@ -169,6 +169,24 @@ void motorMonitor() motorTemps[0], motorTemps[1], motorTemps[2], motorTemps[3], Brain.Battery.voltage()); logHandler("motorMonitor", motorTempsStr, Log::Level::Info); + // Log detailed motor data including RPM, current, and position + std::array motors = {&frontLeftMotor, &frontRightMotor, &rearLeftMotor, &rearRightMotor}; + for (size_t i = 0; i < motors.size(); ++i) + { + if (motors[i]->installed()) + { + int commandedRpm = vexMotorVelocityGet(motors[i]->index()); + int actualRpm = motors[i]->velocity(vex::velocityUnits::rpm); + double position = motors[i]->position(vex::rotationUnits::rev); + double current = motors[i]->current(); + + std::string motorDetailStr = std::format( + "\n | {}: CmdRPM: {} | ActRPM: {} | Pos: {:.2f}rev | Current: {:.2f}A", + motorNames[i], commandedRpm, actualRpm, position, current); + logHandler("motorMonitor", motorDetailStr, Log::Level::Info); + } + } + std::string dataBuffer = std::format("\nX Axis: {}\nY Axis: {}\nZ Axis: {}", InertialGyro.pitch(vex::rotationUnits::deg), InertialGyro.roll(vex::rotationUnits::deg), diff --git a/test_websocket_client.py b/test_websocket_client.py new file mode 100755 index 0000000..13ecc02 --- /dev/null +++ b/test_websocket_client.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Test script for the VEX WebSocket Client + +This script tests the parsing functionality of the WebSocket client +without requiring an actual WebSocket connection. +""" + +import sys +from websocket_client import VEXWebSocketClient + +def test_motor_data_parsing(): + """Test parsing of motor telemetry data.""" + + # Create a client instance for testing + client = VEXWebSocketClient(save_to_file=False) + + # Test messages similar to what the VEX robot would send + test_messages = [ + "[Info] > Time: 15.234 > Module: motorMonitor > \n | LeftTemp: 45°\n | RightTemp: 48°\n | RearLeftTemp: 42°\n | RearRightTemp: 46°\n | Battery Voltage: 12.5V\n", + "[Info] > Time: 15.250 > Module: motorMonitor > \nX Axis: 2.5\nY Axis: -1.2\nZ Axis: 180.0", + "[Info] > Time: 15.265 > Module: motorMonitor > \n | FLM: CmdRPM: 150 | ActRPM: 145 | Pos: 2.75rev | Current: 1.2A", + "[Info] > Time: 15.280 > Module: motorMonitor > \n | FRM: CmdRPM: 150 | ActRPM: 148 | Pos: 2.80rev | Current: 1.1A", + "[Warn] > Time: 25.500 > Module: motorMonitor > FLM overheat: 56°" + ] + + print("Testing VEX WebSocket Client Data Parsing") + print("=" * 50) + + for i, message in enumerate(test_messages, 1): + print(f"\nTest {i}: Parsing message") + print(f"Raw: {repr(message)}") + + parsed_data = client.parse_motor_data(message) + + print("Parsed data:") + print(f" Module: {parsed_data.get('module')}") + print(f" Log Level: {parsed_data.get('log_level')}") + print(f" Time: {parsed_data.get('time_sec')}s") + print(f" Motor Temps: {parsed_data['motor_temps']}") + print(f" Battery: {parsed_data.get('battery_voltage')}V") + print(f" Gyro: {parsed_data['gyro_axes']}") + print(f" Motor Details: {parsed_data['motor_details']}") + print("-" * 30) + + print("\nTest completed successfully!") + return True + +def test_statistics(): + """Test statistics generation.""" + client = VEXWebSocketClient(save_to_file=False) + + # Simulate receiving some data with proper structure + test_data = [ + { + "timestamp": "2024-01-01T12:00:00", + "module": "motorMonitor", + "motor_temps": {"left": 45, "right": 48}, + "battery_voltage": 12.5 + }, + { + "timestamp": "2024-01-01T12:00:01", + "module": "motorMonitor", + "motor_temps": {"left": 46, "right": 49}, + "battery_voltage": 12.4 + }, + { + "timestamp": "2024-01-01T12:00:02", + "module": "motorMonitor", + "motor_temps": {"left": 47, "right": 50}, + "battery_voltage": 12.3 + }, + ] + + client.data_log = test_data + stats = client.get_statistics() + + print(f"\nStatistics Test:") + print(f"Total messages: {stats['total_messages']}") + print(f"Modules: {stats['modules']}") + + return True + +if __name__ == "__main__": + try: + test_motor_data_parsing() + test_statistics() + print("\n✅ All tests passed!") + sys.exit(0) + except Exception as e: + print(f"\n❌ Test failed: {e}") + sys.exit(1) \ No newline at end of file diff --git a/websocket_client.py b/websocket_client.py new file mode 100755 index 0000000..849065c --- /dev/null +++ b/websocket_client.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +""" +VEX WebSocket Client for Motor Monitoring + +This client connects to the VEX Extension's WebSocket server to receive +real-time data from the robot, including motor temperatures, RPM, battery +voltage, gyro data, and other telemetry information. + +Usage: + python3 websocket_client.py [--save-to-file] [--host localhost] [--port 7071] +""" + +import websocket +import json +import threading +import time +import argparse +import re +import csv +from datetime import datetime +from typing import Dict, List, Optional, Any +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +class VEXWebSocketClient: + """WebSocket client for receiving VEX robot telemetry data.""" + + def __init__(self, host: str = "localhost", port: int = 7071, save_to_file: bool = False): + """ + Initialize the VEX WebSocket client. + + Args: + host: WebSocket server host (default: localhost) + port: WebSocket server port (default: 7071) + save_to_file: Whether to save received data to file + """ + self.host = host + self.port = port + self.save_to_file = save_to_file + self.ws = None + self.is_connected = False + self.data_log = [] + self.csv_file = None + self.csv_writer = None + + # Data parsing patterns + self.motor_temp_pattern = re.compile(r'(\w+)Temp: (\d+)°') + self.battery_pattern = re.compile(r'Battery Voltage: ([\d.]+)V') + self.axis_pattern = re.compile(r'([XYZ]) Axis: ([-\d.]+)') + self.motor_display_pattern = re.compile(r'(\w+): (\d+)°') + self.motor_detail_pattern = re.compile(r'(\w+): CmdRPM: (-?\d+) \| ActRPM: (-?\d+) \| Pos: ([-\d.]+)rev \| Current: ([\d.]+)A') + + # Initialize CSV file if saving is enabled + if self.save_to_file: + self._init_csv_file() + + def _init_csv_file(self): + """Initialize CSV file for data logging.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"vex_telemetry_{timestamp}.csv" + + try: + self.csv_file = open(filename, 'w', newline='') + fieldnames = [ + 'timestamp', 'module', 'log_level', 'time_sec', + 'left_temp', 'right_temp', 'rear_left_temp', 'rear_right_temp', + 'battery_voltage', 'x_axis', 'y_axis', 'z_axis', + 'motor_name', 'commanded_rpm', 'actual_rpm', 'position_rev', 'current_amps', + 'raw_message' + ] + self.csv_writer = csv.DictWriter(self.csv_file, fieldnames=fieldnames) + self.csv_writer.writeheader() + logger.info(f"CSV logging initialized: {filename}") + except Exception as e: + logger.error(f"Failed to initialize CSV file: {e}") + self.save_to_file = False + + def parse_motor_data(self, message: str) -> Dict[str, Any]: + """ + Parse motor telemetry data from WebSocket message. + + Args: + message: Raw message from WebSocket + + Returns: + Dictionary containing parsed telemetry data + """ + data = { + 'timestamp': datetime.now().isoformat(), + 'raw_message': message, + 'motor_temps': {}, + 'battery_voltage': None, + 'gyro_axes': {}, + 'motor_details': {}, + 'module': None, + 'log_level': None, + 'time_sec': None + } + + # Parse log header (log level, time, module) + log_header_match = re.search(r'\[(\w+)\] > Time: ([\d.]+) > Module: (\w+)', message) + if log_header_match: + data['log_level'] = log_header_match.group(1) + data['time_sec'] = float(log_header_match.group(2)) + data['module'] = log_header_match.group(3) + + # Parse motor temperatures + for match in self.motor_temp_pattern.finditer(message): + motor_name = match.group(1).lower() + temperature = int(match.group(2)) + data['motor_temps'][motor_name] = temperature + + # Parse battery voltage + battery_match = self.battery_pattern.search(message) + if battery_match: + data['battery_voltage'] = float(battery_match.group(1)) + + # Parse gyro axes + for match in self.axis_pattern.finditer(message): + axis = match.group(1).lower() + value = float(match.group(2)) + data['gyro_axes'][axis] = value + + # Parse detailed motor data (RPM, position, current) + motor_detail_match = self.motor_detail_pattern.search(message) + if motor_detail_match: + motor_name = motor_detail_match.group(1).lower() + data['motor_details'][motor_name] = { + 'commanded_rpm': int(motor_detail_match.group(2)), + 'actual_rpm': int(motor_detail_match.group(3)), + 'position_rev': float(motor_detail_match.group(4)), + 'current_amps': float(motor_detail_match.group(5)) + } + + return data + + def on_message(self, ws, message): + """Handle incoming WebSocket messages.""" + try: + logger.info(f"Received: {message}") + + # Parse the message + parsed_data = self.parse_motor_data(message) + + # Add to data log + self.data_log.append(parsed_data) + + # Display parsed information + self._display_telemetry(parsed_data) + + # Save to CSV if enabled + if self.save_to_file and self.csv_writer: + self._save_to_csv(parsed_data) + + except Exception as e: + logger.error(f"Error processing message: {e}") + + def _display_telemetry(self, data: Dict[str, Any]): + """Display telemetry data in a formatted way.""" + if data['module'] == 'motorMonitor': + print("\n" + "="*50) + print(f"MOTOR TELEMETRY - Time: {data.get('time_sec', 'N/A')}s") + print("="*50) + + # Motor temperatures + if data['motor_temps']: + print("Motor Temperatures:") + for motor, temp in data['motor_temps'].items(): + status = "⚠️ HOT!" if temp >= 55 else "✅ OK" + print(f" {motor.upper()}: {temp}°C {status}") + + # Battery voltage + if data['battery_voltage']: + battery_status = "⚠️ LOW!" if data['battery_voltage'] < 12 else "✅ OK" + print(f"Battery: {data['battery_voltage']}V {battery_status}") + + # Gyro data + if data['gyro_axes']: + print("Gyro Axes:") + for axis, value in data['gyro_axes'].items(): + print(f" {axis.upper()}: {value:.2f}°") + + # Motor details (RPM, position, current) + if data['motor_details']: + print("Motor Details:") + for motor, details in data['motor_details'].items(): + print(f" {motor.upper()}:") + print(f" RPM: {details['commanded_rpm']} cmd / {details['actual_rpm']} actual") + print(f" Position: {details['position_rev']:.2f} rev") + print(f" Current: {details['current_amps']:.2f}A") + + print("="*50) + + def _save_to_csv(self, data: Dict[str, Any]): + """Save parsed data to CSV file.""" + try: + # Handle motor details - if present, save each motor as a separate row + if data['motor_details']: + for motor_name, details in data['motor_details'].items(): + row = { + 'timestamp': data['timestamp'], + 'module': data.get('module'), + 'log_level': data.get('log_level'), + 'time_sec': data.get('time_sec'), + 'left_temp': data['motor_temps'].get('left'), + 'right_temp': data['motor_temps'].get('right'), + 'rear_left_temp': data['motor_temps'].get('rearleft'), + 'rear_right_temp': data['motor_temps'].get('rearright'), + 'battery_voltage': data.get('battery_voltage'), + 'x_axis': data['gyro_axes'].get('x'), + 'y_axis': data['gyro_axes'].get('y'), + 'z_axis': data['gyro_axes'].get('z'), + 'motor_name': motor_name, + 'commanded_rpm': details['commanded_rpm'], + 'actual_rpm': details['actual_rpm'], + 'position_rev': details['position_rev'], + 'current_amps': details['current_amps'], + 'raw_message': data['raw_message'] + } + self.csv_writer.writerow(row) + else: + # Save regular data without motor details + row = { + 'timestamp': data['timestamp'], + 'module': data.get('module'), + 'log_level': data.get('log_level'), + 'time_sec': data.get('time_sec'), + 'left_temp': data['motor_temps'].get('left'), + 'right_temp': data['motor_temps'].get('right'), + 'rear_left_temp': data['motor_temps'].get('rearleft'), + 'rear_right_temp': data['motor_temps'].get('rearright'), + 'battery_voltage': data.get('battery_voltage'), + 'x_axis': data['gyro_axes'].get('x'), + 'y_axis': data['gyro_axes'].get('y'), + 'z_axis': data['gyro_axes'].get('z'), + 'motor_name': None, + 'commanded_rpm': None, + 'actual_rpm': None, + 'position_rev': None, + 'current_amps': None, + 'raw_message': data['raw_message'] + } + self.csv_writer.writerow(row) + + self.csv_file.flush() # Ensure data is written immediately + except Exception as e: + logger.error(f"Error saving to CSV: {e}") + + def on_error(self, ws, error): + """Handle WebSocket errors.""" + logger.error(f"WebSocket error: {error}") + + def on_close(self, ws, close_status_code, close_msg): + """Handle WebSocket connection close.""" + self.is_connected = False + logger.info("WebSocket connection closed") + + if self.csv_file: + self.csv_file.close() + logger.info("CSV file closed") + + def on_open(self, ws): + """Handle WebSocket connection open.""" + self.is_connected = True + logger.info("WebSocket connection established") + print(f"Connected to VEX Extension WebSocket server at ws://{self.host}:{self.port}") + print("Waiting for telemetry data...") + + def connect(self): + """Connect to the VEX WebSocket server.""" + url = f"ws://{self.host}:{self.port}/vexrobotics.vexcode/device" + logger.info(f"Connecting to {url}") + + self.ws = websocket.WebSocketApp( + url, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close, + on_open=self.on_open + ) + + # Run forever with reconnection logic + while True: + try: + self.ws.run_forever() + except KeyboardInterrupt: + logger.info("Interrupted by user") + break + except Exception as e: + logger.error(f"Connection error: {e}") + logger.info("Attempting to reconnect in 5 seconds...") + time.sleep(5) + + def get_statistics(self) -> Dict[str, Any]: + """Get statistics about received data.""" + if not self.data_log: + return {"message": "No data received yet"} + + stats = { + "total_messages": len(self.data_log), + "time_range": { + "start": self.data_log[0]['timestamp'], + "end": self.data_log[-1]['timestamp'] + }, + "modules": {}, + "temperature_stats": {}, + "battery_readings": [] + } + + # Analyze data + for entry in self.data_log: + # Module statistics + module = entry.get('module', 'unknown') + stats["modules"][module] = stats["modules"].get(module, 0) + 1 + + # Temperature statistics + for motor, temp in entry['motor_temps'].items(): + if motor not in stats["temperature_stats"]: + stats["temperature_stats"][motor] = {"min": temp, "max": temp, "readings": []} + stats["temperature_stats"][motor]["min"] = min(stats["temperature_stats"][motor]["min"], temp) + stats["temperature_stats"][motor]["max"] = max(stats["temperature_stats"][motor]["max"], temp) + stats["temperature_stats"][motor]["readings"].append(temp) + + # Battery readings + if entry.get('battery_voltage'): + stats["battery_readings"].append(entry['battery_voltage']) + + return stats + + +def main(): + """Main function to run the WebSocket client.""" + parser = argparse.ArgumentParser(description="VEX WebSocket Client for Motor Monitoring") + parser.add_argument("--host", default="localhost", help="WebSocket server host (default: localhost)") + parser.add_argument("--port", type=int, default=7071, help="WebSocket server port (default: 7071)") + parser.add_argument("--save-to-file", action="store_true", help="Save received data to CSV file") + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + print("VEX WebSocket Client for Motor Monitoring") + print("="*50) + print(f"Connecting to: ws://{args.host}:{args.port}/vexrobotics.vexcode/device") + print(f"Save to file: {'Yes' if args.save_to_file else 'No'}") + print("="*50) + print("Press Ctrl+C to stop") + print() + + # Create and start the client + client = VEXWebSocketClient(host=args.host, port=args.port, save_to_file=args.save_to_file) + + try: + client.connect() + except KeyboardInterrupt: + print("\nShutting down...") + if client.data_log: + print(f"Received {len(client.data_log)} messages total") + stats = client.get_statistics() + print("Session Statistics:") + print(json.dumps(stats, indent=2, default=str)) + + +if __name__ == "__main__": + main() \ No newline at end of file