diff --git a/modules/hitl/camera_emulator.py b/modules/hitl/camera_emulator.py index 07e82a8..1f42dca 100644 --- a/modules/hitl/camera_emulator.py +++ b/modules/hitl/camera_emulator.py @@ -91,31 +91,10 @@ def __init__( self.__image_index = 0 self.__next_image_time = time.time() + time_between_images self.__time_between_images = time_between_images - + self.__get_images() self.update_current_image() - def periodic(self) -> None: - """ - Periodic function. - """ - try: - # Send frame and pace to target FPS - self.send_frame() - self.sleep_until_next_frame() - - now = time.time() - if now >= self.__next_image_time: - # Cycle image once per second - try: - self.next_image() - self.update_current_image() - except Exception as exc: # pylint: disable=broad-except - print(f"HITL camera image update error: {exc}") - self.__next_image_time = now + self.__time_between_images - except Exception as exc: # pylint: disable=broad-except - print(f"HITL camera periodic error: {exc}") - def send_frame(self) -> None: """ sends a new frame to virtual camera, should be called in a loop diff --git a/modules/hitl/example_usage.py b/modules/hitl/example_usage.py new file mode 100644 index 0000000..57b8cae --- /dev/null +++ b/modules/hitl/example_usage.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +""" +Example usage of the JSON Position Emulator in HITL mode. +This demonstrates how the position emulator cycles through JSON coordinates. +""" + +import time +from modules.hitl.hitl_base import HITL +from modules.mavlink import dronekit + +def example_json_position_emulator(): + """ + Example showing how the JSON position emulator works. + """ + + # Example 1: Basic usage with JSON file + print("=== Example 1: Basic JSON Position Emulation ===") + + # Create a mock drone (in real usage, this would be a real dronekit.Vehicle) + # For this example, we'll assume you have a real drone connection + # drone = dronekit.connect("tcp:127.0.0.1:5760") # SITL connection + + # Create HITL with JSON coordinates + success, hitl = HITL.create( + drone=None, # Replace with actual drone connection + hitl_enabled=True, + position_module=True, + camera_module=False, + json_file_path="test_coordinates.json", # Our test file + position_update_interval=1.0 # 1 second intervals + ) + + if success and hitl: + print("āœ… HITL created successfully with JSON position emulation") + print("šŸ“ JSON file: test_coordinates.json") + print("ā±ļø Update interval: 1.0 seconds") + print("šŸ”„ Will cycle through coordinates every second") + + # Start the HITL system + hitl.start() + print("šŸš€ HITL started - position emulation running...") + + # Simulate running for 5 seconds to show coordinate cycling + print("\nšŸ“Š Coordinate cycling simulation:") + for i in range(5): + time.sleep(1) + print(f"ā° Time: {i+1}s - Position emulator is running...") + + # Stop the system + hitl.shutdown() + print("šŸ›‘ HITL stopped") + + print("\n" + "="*60) + + # Example 2: Custom update interval + print("=== Example 2: Custom Update Interval (2 seconds) ===") + + success, hitl2 = HITL.create( + drone=None, # Replace with actual drone connection + hitl_enabled=True, + position_module=True, + camera_module=False, + json_file_path="test_coordinates.json", + position_update_interval=2.0 # 2 second intervals + ) + + if success and hitl2: + print("āœ… HITL created with 2-second update interval") + print("ā±ļø Coordinates will change every 2 seconds") + + print("\n" + "="*60) + + # Example 3: No JSON file (uses Ardupilot) + print("=== Example 3: No JSON File (Ardupilot Mode) ===") + + success, hitl3 = HITL.create( + drone=None, # Replace with actual drone connection + hitl_enabled=True, + position_module=True, + camera_module=False + # No json_file_path - will use Ardupilot pathing + ) + + if success and hitl3: + print("āœ… HITL created without JSON file") + print("šŸŽÆ Will use Ardupilot's internal pathing") + print("āŒ No 1-second coordinate shifting") + +def explain_coordinate_cycling(): + """ + Explains how the coordinate cycling works internally. + """ + print("\n" + "="*60) + print("šŸ” HOW COORDINATE CYCLING WORKS INTERNALLY") + print("="*60) + + print(""" +1. šŸ“ JSON Loading: + - Loads coordinates from test_coordinates.json + - Validates format: [[lat, lon, alt], [lat, lon, alt], ...] + - Stores in self.json_coordinates list + +2. ā° Timing Control: + - Sets next_coordinate_time = current_time + update_interval + - Checks every periodic() call if it's time to update + +3. šŸ”„ Coordinate Cycling: + - Gets current coordinate: json_coordinates[current_index] + - Calls set_target_position(lat, lon, alt) + - Increments current_index (cycles back to 0 at end) + - Updates next_coordinate_time for next cycle + +4. šŸ“” Position Injection: + - inject_position() sends GPS data to flight controller + - Uses MAVLink GPS_INPUT message + - Simulates GPS coordinates for the drone + +5. šŸŽÆ Priority System: + - JSON coordinates have priority over Ardupilot + - If JSON available: use JSON cycling + - If no JSON: fallback to Ardupilot pathing +""") + +def show_json_format(): + """ + Shows the expected JSON format and example data. + """ + print("\n" + "="*60) + print("šŸ“„ JSON FILE FORMAT") + print("="*60) + + print(""" +Expected JSON format: +[ + [latitude, longitude, altitude], + [latitude, longitude, altitude], + ... +] + +Example (test_coordinates.json): +[ + [43.43405014107003, -80.57898027451816, 373.0], + [40.0, -40.0, 200.0], + [41.29129039399329, -81.78471782918818, 373.0] +] + +This will cycle through: +1. First coordinate for 1 second +2. Second coordinate for 1 second +3. Third coordinate for 1 second +4. Back to first coordinate (loops forever) +""") + +if __name__ == "__main__": + print("🚁 HITL JSON Position Emulator Example") + print("="*60) + + # Show JSON format + show_json_format() + + # Explain how it works + explain_coordinate_cycling() + + # Run examples (commented out since we don't have real drone) + print("\n" + "="*60) + print("šŸ’” TO USE IN REAL CODE:") + print("="*60) + print(""" +# In your flight_controller.py: +from modules.hitl.hitl_base import HITL + +# Create HITL with JSON coordinates +success, hitl = HITL.create( + drone=your_drone_connection, + hitl_enabled=True, + position_module=True, + camera_module=False, + json_file_path="path/to/your/coordinates.json", + position_update_interval=1.0 # seconds +) + +if success: + hitl.start() # Start position emulation + # Your main flight loop here + hitl.shutdown() # Clean shutdown +""") + + # Uncomment to run actual examples (requires drone connection): + # example_json_position_emulator() diff --git a/modules/hitl/execution_flow.txt b/modules/hitl/execution_flow.txt new file mode 100644 index 0000000..c4e3ec2 --- /dev/null +++ b/modules/hitl/execution_flow.txt @@ -0,0 +1,142 @@ +HITL JSON Position Emulator - Execution Flow +============================================= + +INITIALIZATION PHASE: +--------------------- +1. HITL.create() called with json_file_path parameter +2. PositionEmulator.create() called with json_file_path and update_interval +3. PositionEmulator.__init__() runs: + - Sets up json_coordinates = [] + - Sets current_coordinate_index = 0 + - Sets update_interval (default 1.0 seconds) + - Sets next_coordinate_time = current_time + update_interval + - Calls _load_json_coordinates() if json_file_path provided + +4. _load_json_coordinates() runs: + - Opens and parses JSON file + - Validates format: list of [lat, lon, alt] lists + - Stores in self.json_coordinates + - Prints confirmation message + +RUNTIME PHASE (periodic() called every loop): +--------------------------------------------- +1. periodic() called by HITL thread +2. Check if json_coordinates exist: + + IF JSON COORDINATES EXIST: + --------------------------- + 3a. Check if current_time >= next_coordinate_time: + - IF YES: Call _next_json_coordinate() + * Get coordinate: json_coordinates[current_coordinate_index] + * Extract: latitude, longitude, altitude + * Call set_target_position(lat, lon, alt) + * Print progress message + * Increment current_coordinate_index (cycle at end) + * Set next_coordinate_time = current_time + update_interval + - IF NO: Do nothing (wait for next cycle) + + ELSE (NO JSON COORDINATES): + --------------------------- + 3b. Call get_target_position() to get Ardupilot position + - Try to get POSITION_TARGET_GLOBAL_INT from flight controller + - If available: use Ardupilot coordinates + - If not available: use fallback target_position + +4. ALWAYS: Call inject_position(*target_position) + - Create GPS_INPUT MAVLink message + - Send to flight controller + - Flush connection + +EXAMPLE EXECUTION WITH test_coordinates.json: +============================================= + +JSON File Content: +[ + [43.43405014107003, -80.57898027451816, 373.0], // Coordinate 0 + [40.0, -40.0, 200.0], // Coordinate 1 + [41.29129039399329, -81.78471782918818, 373.0] // Coordinate 2 +] + +Time 0.0s: INITIALIZATION +- Loads 3 coordinates from JSON +- Sets current_coordinate_index = 0 +- Sets next_coordinate_time = 0.0 + 1.0 = 1.0s +- Prints: "HITL loaded 3 coordinates from test_coordinates.json" + +Time 0.1s: periodic() call +- current_time = 0.1s +- 0.1s < 1.0s, so no coordinate change +- Uses current target_position (default: 43.434..., -80.578..., 373.0) +- Injects position to flight controller + +Time 0.2s: periodic() call +- current_time = 0.2s +- 0.2s < 1.0s, so no coordinate change +- Uses current target_position +- Injects position to flight controller + +... (continues every ~0.1s) ... + +Time 1.0s: periodic() call +- current_time = 1.0s +- 1.0s >= 1.0s, so TIME TO CHANGE COORDINATE! +- Calls _next_json_coordinate(): + * Gets coordinate[0]: [43.434..., -80.578..., 373.0] + * Sets target_position = (43.434..., -80.578..., 373.0) + * Prints: "HITL set JSON coordinate 1/3: (43.434..., -80.578..., 373.0)" + * Increments current_coordinate_index = 1 + * Sets next_coordinate_time = 1.0 + 1.0 = 2.0s +- Injects new position to flight controller + +Time 1.1s: periodic() call +- current_time = 1.1s +- 1.1s < 2.0s, so no coordinate change +- Uses current target_position (still coordinate 0) +- Injects position to flight controller + +... (continues every ~0.1s) ... + +Time 2.0s: periodic() call +- current_time = 2.0s +- 2.0s >= 2.0s, so TIME TO CHANGE COORDINATE! +- Calls _next_json_coordinate(): + * Gets coordinate[1]: [40.0, -40.0, 200.0] + * Sets target_position = (40.0, -40.0, 200.0) + * Prints: "HITL set JSON coordinate 2/3: (40.0, -40.0, 200.0)" + * Increments current_coordinate_index = 2 + * Sets next_coordinate_time = 2.0 + 1.0 = 3.0s +- Injects new position to flight controller + +Time 3.0s: periodic() call +- current_time = 3.0s +- 3.0s >= 3.0s, so TIME TO CHANGE COORDINATE! +- Calls _next_json_coordinate(): + * Gets coordinate[2]: [41.291..., -81.784..., 373.0] + * Sets target_position = (41.291..., -81.784..., 373.0) + * Prints: "HITL set JSON coordinate 3/3: (41.291..., -81.784..., 373.0)" + * Increments current_coordinate_index = 3, but 3 % 3 = 0 (cycles back!) + * Sets next_coordinate_time = 3.0 + 1.0 = 4.0s +- Injects new position to flight controller + +Time 4.0s: periodic() call +- current_time = 4.0s +- 4.0s >= 4.0s, so TIME TO CHANGE COORDINATE! +- Calls _next_json_coordinate(): + * Gets coordinate[0]: [43.434..., -80.578..., 373.0] (back to first!) + * Sets target_position = (43.434..., -80.578..., 373.0) + * Prints: "HITL set JSON coordinate 1/3: (43.434..., -80.578..., 373.0)" + * Increments current_coordinate_index = 1 + * Sets next_coordinate_time = 4.0 + 1.0 = 5.0s +- Injects new position to flight controller + +... (continues cycling forever) ... + +KEY POINTS: +=========== +- Coordinates change every 1 second (or custom interval) +- Cycles through all coordinates then repeats +- Each coordinate is held for the full interval duration +- Position is injected to flight controller every periodic() call +- JSON coordinates override Ardupilot when available +- If no JSON file, falls back to Ardupilot pathing +- Error handling prevents crashes if JSON loading fails diff --git a/modules/hitl/hitl_base.py b/modules/hitl/hitl_base.py index 5c40b3d..09daeb1 100644 --- a/modules/hitl/hitl_base.py +++ b/modules/hitl/hitl_base.py @@ -26,6 +26,8 @@ def create( position_module: bool, camera_module: bool, images_path: str | None = None, + json_file_path: str | None = None, + position_update_interval: float = 1.0, ) -> "tuple[True, HITL] | tuple[False, None]": """ Factory method to create a HITL instance. @@ -36,6 +38,8 @@ def create( position_module: Boolean indicating if the position module is enabled. camera_module: Boolean indicating if the camera module is enabled. images_path: Optional path to the images directory for the camera emulator. + json_file_path: Optional path to JSON file containing coordinates for position emulator. + position_update_interval: Interval (seconds) between switching JSON coordinates. Returns: Success, HITL instance | None. @@ -47,7 +51,7 @@ def create( return True, HITL(cls.__create_key, drone, None, None) if position_module: - result, position_emulator = PositionEmulator.create(drone) + result, position_emulator = PositionEmulator.create(drone, json_file_path, position_update_interval) if not result: return False, None @@ -81,10 +85,7 @@ def __init__( self.position_emulator = position_emulator self.camera_emulator = camera_emulator - self._stop_event: Event | None = None - self._threads: list[Thread] = [] - - def start(self) -> None: + def set_inject_position(self) -> None: """ Start HITL module threads. """ @@ -140,7 +141,6 @@ def run_position(self) -> None: self.position_emulator.periodic() except Exception as exc: # pylint: disable=broad-except print(f"HITL position thread error: {exc}") - time.sleep(0.1) def run_camera(self) -> None: """ @@ -152,4 +152,3 @@ def run_camera(self) -> None: self.camera_emulator.periodic() except Exception as exc: # pylint: disable=broad-except print(f"HITL camera thread error: {exc}") - time.sleep(0.1) diff --git a/modules/hitl/position_emulator.py b/modules/hitl/position_emulator.py index eb3c3cb..ac69d47 100644 --- a/modules/hitl/position_emulator.py +++ b/modules/hitl/position_emulator.py @@ -2,6 +2,7 @@ Emulates position and attitude to Pixhawk. """ +import json import time from ..mavlink import dronekit @@ -15,30 +16,85 @@ class PositionEmulator: @classmethod def create( - cls, drone: dronekit.Vehicle + cls, drone: dronekit.Vehicle, json_file_path: str | None = None, update_interval: float = 1.0 ) -> "tuple[True, PositionEmulator] | tuple[False, None]": """ Setup position emulator. + Args: + drone: Dronekit vehicle instance. + json_file_path: Optional path to JSON file containing coordinates. + update_interval: Interval (seconds) between switching JSON coordinates. + Returns: Success, PositionEmulator instance. """ + return True, PositionEmulator(cls.__create_key, drone, json_file_path, update_interval) - return True, PositionEmulator(cls.__create_key, drone) - - def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) -> None: + def __init__( + self, + class_private_create_key: object, + drone: dronekit.Vehicle, + json_file_path: str | None = None, + update_interval: float = 1.0, + ) -> None: """ Private constructor, use create() method. """ assert class_private_create_key is PositionEmulator.__create_key, "Use create() method" self.target_position = (43.43405014107003, -80.57898027451816, 373.0) # lat, lon, alt - self.drone = drone + self.json_file_path = json_file_path + self.json_coordinates: list[list[float]] = [] + self.current_coordinate_index = 0 + self.update_interval = update_interval + self.next_coordinate_time = time.time() + self.update_interval + + if self.json_file_path: + self._load_json_coordinates() + + def _load_json_coordinates(self) -> None: + """ + Loads coordinates from the JSON file and validates them. + """ + try: + with open(self.json_file_path, 'r', encoding='utf-8') as file: + self.json_coordinates = json.load(file) + if not isinstance(self.json_coordinates, list) or not all( + isinstance(coord, list) and len(coord) >= 3 for coord in self.json_coordinates + ): + raise ValueError("JSON must be a list of [lat, lon, alt] lists") + + print(f"HITL loaded {len(self.json_coordinates)} coordinates from {self.json_file_path}") + except Exception as exc: # pylint: disable=broad-except + print(f"HITL JSON coordinate loading error: {exc}") + self.json_coordinates = [] + + def _next_json_coordinate(self) -> None: + """ + Cycles to the next JSON coordinate and sets it as target position. + """ + if not self.json_coordinates: + return + + coordinate = self.json_coordinates[self.current_coordinate_index] + latitude, longitude, altitude = coordinate[0], coordinate[1], coordinate[2] + self.set_target_position(latitude, longitude, altitude) + print( + f"HITL set JSON coordinate {self.current_coordinate_index + 1}/{len(self.json_coordinates)}: " + f"({latitude}, {longitude}, {altitude})" + ) + + # Cycle to next coordinate + self.current_coordinate_index = (self.current_coordinate_index + 1) % len(self.json_coordinates) + +<<<<<<< HEAD +======= def set_target_position(self, latitude: float, longitude: float, altitude: float) -> None: """ - Sets the target position manually (currently a fallback if Ardupilot target doesnt work). + Sets the target position manually. Args: latitude: Latitude in degrees. @@ -49,20 +105,19 @@ def set_target_position(self, latitude: float, longitude: float, altitude: float def get_target_position(self) -> tuple[float, float, float]: """ - Gets the target position from the Ardupilot target. + Gets the target position from the Ardupilot target if available. Returns: Target position as (latitude, longitude, altitude). """ # pylint: disable=protected-access - position_target = None try: position_target = self.drone._master.recv_match( type="POSITION_TARGET_GLOBAL_INT", blocking=False ) except Exception as exc: # pylint: disable=broad-except print(f"HITL get_target_position recv_match error: {exc}") - position_target = None + return self.target_position # pylint: enable=protected-access if position_target: @@ -71,17 +126,25 @@ def get_target_position(self) -> tuple[float, float, float]: altitude = position_target.alt return (latitude, longitude, altitude) - # Optionally log if no message received - # print("No POSITION_TARGET_GLOBAL_INT message received.") - return self.target_position def periodic(self) -> None: """ - Periodic function. + Periodic function. Updates target position either from JSON coordinates or Ardupilot. """ - - self.target_position = self.get_target_position() + now = time.time() + + if self.json_coordinates: + # JSON has priority + if now >= self.next_coordinate_time: + try: + self._next_json_coordinate() + except Exception as exc: # pylint: disable=broad-except + print(f"HITL JSON coordinate update error: {exc}") + self.next_coordinate_time = now + self.update_interval + else: + # Fallback: use Ardupilot position target + self.target_position = self.get_target_position() self.inject_position( self.target_position[0], self.target_position[1], self.target_position[2] @@ -123,4 +186,4 @@ def inject_position( ] gps_input_msg = self.drone.message_factory.gps_input_encode(*values) self.drone.send_mavlink(gps_input_msg) - self.drone.flush() + self.drone.flush() \ No newline at end of file diff --git a/modules/hitl/test_coordinates.json b/modules/hitl/test_coordinates.json new file mode 100644 index 0000000..842a224 --- /dev/null +++ b/modules/hitl/test_coordinates.json @@ -0,0 +1,5 @@ +[ + [43.43405014107003, -80.57898027451816, 373.0], + [40.0, -40.0, 200.0], + [41.29129039399329, -81.78471782918818, 373.0] +] diff --git a/tests/integration/hitl/test_threading.py b/tests/integration/hitl/test_threading.py index 1a17542..6d7dcb0 100644 --- a/tests/integration/hitl/test_threading.py +++ b/tests/integration/hitl/test_threading.py @@ -2,6 +2,7 @@ Tests the threading behavior of the HITL module (gps and camera modules). """ +<<<<<<< HEAD # Physical connection to Pixhawk: /dev/ttyAMA0 # Simulated connection to Pixhawk: tcp:localhost:5762 PIXHAWK_ADDRESS = "/dev/ttyAMA0" @@ -11,6 +12,15 @@ from modules.mavlink import flight_controller +======= +PIXHAWK_ADDRESS = "tcp:localhost:5762" + +import os +import time +from modules.mavlink import flight_controller + + +>>>>>>> 2bf98af (Adds proper position listening for mission waypoints in hitl) def main() -> int: """ Main function. @@ -18,13 +28,17 @@ def main() -> int: images_folder_path = os.path.join("tests", "integration", "camera_emulator", "images") result, controller = flight_controller.FlightController.create( - PIXHAWK_ADDRESS, 57600, True, True, True, images_folder_path + PIXHAWK_ADDRESS, 57600, True, True # True, images_folder_path ) if not result: print("Failed to create flight controller") return -1 - time.sleep(10) + controller.set_flight_mode("AUTO") + + controller.insert_waypoint(0, 40, -40, 300) + + time.sleep(20) return 0