diff --git a/modules/hitl/hitl_base.py b/modules/hitl/hitl_base.py index 5c40b3d..13539f2 100644 --- a/modules/hitl/hitl_base.py +++ b/modules/hitl/hitl_base.py @@ -26,6 +26,8 @@ def create( position_module: bool, camera_module: bool, images_path: str | None = None, + json_file_path: str | None = None, + position_update_interval: float = 1.0, ) -> "tuple[True, HITL] | tuple[False, None]": """ Factory method to create a HITL instance. @@ -36,6 +38,8 @@ def create( position_module: Boolean indicating if the position module is enabled. camera_module: Boolean indicating if the camera module is enabled. images_path: Optional path to the images directory for the camera emulator. + json_file_path: Optional path to JSON file containing coordinates for position emulator. + position_update_interval: Interval (seconds) between switching JSON coordinates. Returns: Success, HITL instance | None. @@ -47,7 +51,9 @@ def create( return True, HITL(cls.__create_key, drone, None, None) if position_module: - result, position_emulator = PositionEmulator.create(drone) + result, position_emulator = PositionEmulator.create( + drone, json_file_path, position_update_interval + ) if not result: return False, None diff --git a/modules/hitl/position_emulator.py b/modules/hitl/position_emulator.py index eb3c3cb..28aa857 100644 --- a/modules/hitl/position_emulator.py +++ b/modules/hitl/position_emulator.py @@ -2,6 +2,7 @@ Emulates position and attitude to Pixhawk. """ +import json import time from ..mavlink import dronekit @@ -15,18 +16,32 @@ class PositionEmulator: @classmethod def create( - cls, drone: dronekit.Vehicle + cls, + drone: dronekit.Vehicle, + json_file_path: str | None = None, + update_interval: float = 1.0, ) -> "tuple[True, PositionEmulator] | tuple[False, None]": """ Setup position emulator. + Args: + drone: Dronekit vehicle instance. + json_file_path: Optional path to JSON file containing coordinates. + update_interval: Interval (seconds) between switching JSON coordinates. + Returns: Success, PositionEmulator instance. """ - return True, PositionEmulator(cls.__create_key, drone) + return True, PositionEmulator(cls.__create_key, drone, json_file_path, update_interval) - def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) -> None: + def __init__( + self, + class_private_create_key: object, + drone: dronekit.Vehicle, + json_file_path: str | None = None, + update_interval: float = 1.0, + ) -> None: """ Private constructor, use create() method. """ @@ -35,10 +50,58 @@ def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) -> self.target_position = (43.43405014107003, -80.57898027451816, 373.0) # lat, lon, alt self.drone = drone + self.json_file_path = json_file_path + self.json_coordinates: list[list[float]] = [] + self.current_coordinate_index = 0 + self.update_interval = update_interval + self.next_coordinate_time = time.time() + self.update_interval + + if self.json_file_path: + self._load_json_coordinates() + + def _load_json_coordinates(self) -> None: + """ + Loads coordinates from the JSON file and validates them. + """ + try: + with open(self.json_file_path, "r", encoding="utf-8") as file: + self.json_coordinates = json.load(file) + + if not isinstance(self.json_coordinates, list) or not all( + isinstance(coord, list) and len(coord) >= 3 for coord in self.json_coordinates + ): + raise ValueError("JSON must be a list of [lat, lon, alt] lists") + + print( + f"HITL loaded {len(self.json_coordinates)} coordinates from {self.json_file_path}" + ) + except Exception as exc: # pylint: disable=broad-except + print(f"HITL JSON coordinate loading error: {exc}") + self.json_coordinates = [] + + def _next_json_coordinate(self) -> None: + """ + Cycles to the next JSON coordinate and sets it as target position. + """ + if not self.json_coordinates: + return + + coordinate = self.json_coordinates[self.current_coordinate_index] + latitude, longitude, altitude = coordinate[0], coordinate[1], coordinate[2] + self.set_target_position(latitude, longitude, altitude) + print( + f"HITL set JSON coordinate {self.current_coordinate_index + 1}/{len(self.json_coordinates)}: " + f"({latitude}, {longitude}, {altitude})" + ) + + # Cycle to next coordinate + self.current_coordinate_index = (self.current_coordinate_index + 1) % len( + self.json_coordinates + ) def set_target_position(self, latitude: float, longitude: float, altitude: float) -> None: """ - Sets the target position manually (currently a fallback if Ardupilot target doesnt work). + Sets the target position manually Args: latitude: Latitude in degrees. @@ -49,7 +112,7 @@ def set_target_position(self, latitude: float, longitude: float, altitude: float def get_target_position(self) -> tuple[float, float, float]: """ - Gets the target position from the Ardupilot target. + Gets the target position from the Ardupilot target if available. Returns: Target position as (latitude, longitude, altitude). @@ -62,7 +125,7 @@ def get_target_position(self) -> tuple[float, float, float]: ) except Exception as exc: # pylint: disable=broad-except print(f"HITL get_target_position recv_match error: {exc}") - position_target = None + return self.target_position # pylint: enable=protected-access if position_target: @@ -78,10 +141,21 @@ def get_target_position(self) -> tuple[float, float, float]: def periodic(self) -> None: """ - Periodic function. + Periodic function. Updates target position either from JSON coordinates or Ardupilot. """ - - self.target_position = self.get_target_position() + now = time.time() + + if self.json_coordinates: + # JSON has priority + if now >= self.next_coordinate_time: + try: + self._next_json_coordinate() + except Exception as exc: # pylint: disable=broad-except + print(f"HITL JSON coordinate update error: {exc}") + self.next_coordinate_time = now + self.update_interval + else: + # Fallback: use Ardupilot position target + self.target_position = self.get_target_position() self.inject_position( self.target_position[0], self.target_position[1], self.target_position[2] diff --git a/modules/mavlink/flight_controller.py b/modules/mavlink/flight_controller.py index 7d46d70..c049938 100644 --- a/modules/mavlink/flight_controller.py +++ b/modules/mavlink/flight_controller.py @@ -211,6 +211,8 @@ def create( position_module: bool = False, camera_module: bool = False, images_path: str | None = None, + position_json_path: str | None = None, + position_update_interval: float = 1.0, ) -> "tuple[bool, FlightController | None]": """ address: TCP address or serial port of the drone (e.g. "tcp:127.0.0.1:14550"). @@ -227,7 +229,13 @@ def create( ) # Enable/disable HITL based on mode success, hitl_instance = hitl_base.HITL.create( - drone, hitl_enabled, position_module, camera_module, images_path + drone, + hitl_enabled, + position_module, + camera_module, + images_path, + position_json_path, + position_update_interval, ) if success: if hitl_enabled and hitl_instance is not None: diff --git a/tests/integration/hitl/test_coordinates.json b/tests/integration/hitl/test_coordinates.json new file mode 100644 index 0000000..b7f1167 --- /dev/null +++ b/tests/integration/hitl/test_coordinates.json @@ -0,0 +1,5 @@ +[ + [43.43405014107003, -80.57898027451816, 373.0], + [40.0, -40.0, 200.0], + [41.29129039399329, -81.78471782918818, 373.0] +] \ No newline at end of file diff --git a/tests/integration/hitl/test_hitl_json_parser.py b/tests/integration/hitl/test_hitl_json_parser.py new file mode 100644 index 0000000..1fd0f5b --- /dev/null +++ b/tests/integration/hitl/test_hitl_json_parser.py @@ -0,0 +1,204 @@ +""" +Example usage of the JSON Position Emulator in HITL mode. +This demonstrates how the position emulator cycles through JSON coordinates. +""" + +import os +import time +from modules.mavlink.flight_controller import FlightController + +# Physical connection to Pixhawk: /dev/ttyAMA0 +# Simulated connection to Pixhawk: tcp:localhost:5762 +PIXHAWK_ADDRESS = "/dev/ttyAMA0" + + +def example_json_position_emulator() -> None: + """ + Example showing how the JSON position emulator works. + + + """ + + images_folder_path = os.path.join("tests", "integration", "camera_emulator", "images") + position_json_path = os.path.join("tests", "integration", "hitl", "test_coordinates.json") + + # Example 1: Basic usage with JSON file + print("=== Example 1: Basic JSON Position Emulation ===") + + # Create a mock drone (in real usage, this would be a real dronekit.Vehicle) + # For this example, we'll assume you have a real drone connection + # drone = dronekit.connect("tcp:127.0.0.1:5760") # SITL connection + + # Create HITL with JSON coordinates + success, flight_controller = FlightController.create( + PIXHAWK_ADDRESS, + 57600, + True, + True, + True, + images_folder_path, + position_json_path, # Our test file + ) + + if success and flight_controller: + print("āœ… HITL created successfully with JSON position emulation") + print("šŸ“ JSON file: test_coordinates.json") + print("ā±ļø Update interval: 1.0 seconds") + print("šŸ”„ Will cycle through coordinates every second") + + # Start the HITL system + print("šŸš€ HITL started - position emulation running...") + + # Simulate running for 5 seconds to show coordinate cycling + print("\nšŸ“Š Coordinate cycling simulation:") + for i in range(10): + time.sleep(1) + print(f"ā° Time: {i+1}s - Position emulator is running...") + + print("šŸ›‘ HITL stopped") + + print("\n" + "=" * 60) + + # Example 2: Custom update interval + """ + print("=== Example 2: Custom Update Interval (2 seconds) ===") + + success, flight_controller = FlightController.create( + PIXHAWK_ADDRESS, + 57600, + True, + True, + True, + images_folder_path, + position_json_path, # Our test file + position_update_interval=2.0, + ) + + if success and flight_controller: + print("āœ… HITL created with 2-second update interval") + print("ā±ļø Coordinates will change every 2 seconds") + + print("\n" + "=" * 60) + """ + + # Example 3: No JSON file (uses Ardupilot) + """ + print("=== Example 3: No JSON File (Ardupilot Mode) ===") + + success, flight_controller = FlightController.create( + PIXHAWK_ADDRESS, + 57600, + True, + True, + True, + images_folder_path, + ) + + if success and flight_controller: + print("āœ… HITL created without JSON file") + print("šŸŽÆ Will use Ardupilot's internal pathing") + print("āŒ No 1-second coordinate shifting") + """ + + +def explain_coordinate_cycling() -> None: + """ + Explains how the coordinate cycling works internally. + """ + print("\n" + "=" * 60) + print("šŸ” HOW COORDINATE CYCLING WORKS INTERNALLY") + print("=" * 60) + + print( + """ +1. šŸ“ JSON Loading: + - Loads coordinates from test_coordinates.json + - Validates format: [[lat, lon, alt], [lat, lon, alt], ...] + - Stores in self.json_coordinates list +2. ā° Timing Control: + - Sets next_coordinate_time = current_time + update_interval + - Checks every periodic() call if it's time to update +3. šŸ”„ Coordinate Cycling: + - Gets current coordinate: json_coordinates[current_index] + - Calls set_target_position(lat, lon, alt) + - Increments current_index (cycles back to 0 at end) + - Updates next_coordinate_time for next cycle +4. šŸ“” Position Injection: + - inject_position() sends GPS data to flight controller + - Uses MAVLink GPS_INPUT message + - Simulates GPS coordinates for the drone +5. šŸŽÆ Priority System: + - JSON coordinates have priority over Ardupilot + - If JSON available: use JSON cycling + - If no JSON: fallback to Ardupilot pathing +""" + ) + + +def show_json_format() -> None: + """ + Shows the expected JSON format and example data. + """ + print("\n" + "=" * 60) + print("šŸ“„ JSON FILE FORMAT") + print("=" * 60) + + print( + """ +Expected JSON format: +[ + [latitude, longitude, altitude], + [latitude, longitude, altitude], + ... +] +Example (test_coordinates.json): +[ + [43.43405014107003, -80.57898027451816, 373.0], + [40.0, -40.0, 200.0], + [41.29129039399329, -81.78471782918818, 373.0] +] +This will cycle through: +1. First coordinate for 1 second +2. Second coordinate for 1 second +3. Third coordinate for 1 second +4. Back to first coordinate (loops forever) +""" + ) + + +if __name__ == "__main__": + print("🚁 HITL JSON Position Emulator Example") + print("=" * 60) + + # Show JSON format + show_json_format() + + # Explain how it works + explain_coordinate_cycling() + + # Run examples (commented out since we don't have real drone) + print("\n" + "=" * 60) + print("šŸ’” TO USE IN REAL CODE:") + print("=" * 60) + print( + """ +# In your flight_controller.py: +from modules.hitl.hitl_base import HITL +# Create HITL with JSON coordinates +success, hitl = HITL.create( + drone=your_drone_connection, + hitl_enabled=True, + position_module=True, + camera_module=False, + json_file_path="path/to/your/coordinates.json", + position_update_interval=1.0 # seconds +) +if success: + hitl.start() # Start position emulation + # Your main flight loop here + hitl.shutdown() # Clean shutdown +""" + ) + + # Uncomment to run actual examples (requires drone connection): + example_json_position_emulator()