Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion modules/hitl/hitl_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def create(
position_module: bool,
camera_module: bool,
images_path: str | None = None,
json_file_path: str | None = None,
position_update_interval: float = 1.0,
) -> "tuple[True, HITL] | tuple[False, None]":
"""
Factory method to create a HITL instance.
Expand All @@ -36,6 +38,8 @@ def create(
position_module: Boolean indicating if the position module is enabled.
camera_module: Boolean indicating if the camera module is enabled.
images_path: Optional path to the images directory for the camera emulator.
json_file_path: Optional path to JSON file containing coordinates for position emulator.
position_update_interval: Interval (seconds) between switching JSON coordinates.

Returns:
Success, HITL instance | None.
Expand All @@ -47,7 +51,9 @@ def create(
return True, HITL(cls.__create_key, drone, None, None)

if position_module:
result, position_emulator = PositionEmulator.create(drone)
result, position_emulator = PositionEmulator.create(
drone, json_file_path, position_update_interval
)
if not result:
return False, None

Expand Down
92 changes: 83 additions & 9 deletions modules/hitl/position_emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Emulates position and attitude to Pixhawk.
"""

import json
import time
from ..mavlink import dronekit

Expand All @@ -15,18 +16,32 @@ class PositionEmulator:

@classmethod
def create(
cls, drone: dronekit.Vehicle
cls,
drone: dronekit.Vehicle,
json_file_path: str | None = None,
update_interval: float = 1.0,
) -> "tuple[True, PositionEmulator] | tuple[False, None]":
"""
Setup position emulator.

Args:
drone: Dronekit vehicle instance.
json_file_path: Optional path to JSON file containing coordinates.
update_interval: Interval (seconds) between switching JSON coordinates.

Returns:
Success, PositionEmulator instance.
"""

return True, PositionEmulator(cls.__create_key, drone)
return True, PositionEmulator(cls.__create_key, drone, json_file_path, update_interval)

def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) -> None:
def __init__(
self,
class_private_create_key: object,
drone: dronekit.Vehicle,
json_file_path: str | None = None,
update_interval: float = 1.0,
) -> None:
"""
Private constructor, use create() method.
"""
Expand All @@ -35,10 +50,58 @@ def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) ->
self.target_position = (43.43405014107003, -80.57898027451816, 373.0) # lat, lon, alt

self.drone = drone
self.json_file_path = json_file_path
self.json_coordinates: list[list[float]] = []
self.current_coordinate_index = 0
self.update_interval = update_interval
self.next_coordinate_time = time.time() + self.update_interval

if self.json_file_path:
self._load_json_coordinates()

def _load_json_coordinates(self) -> None:
"""
Loads coordinates from the JSON file and validates them.
"""
try:
with open(self.json_file_path, "r", encoding="utf-8") as file:
self.json_coordinates = json.load(file)

if not isinstance(self.json_coordinates, list) or not all(
isinstance(coord, list) and len(coord) >= 3 for coord in self.json_coordinates
):
raise ValueError("JSON must be a list of [lat, lon, alt] lists")

print(
f"HITL loaded {len(self.json_coordinates)} coordinates from {self.json_file_path}"
)
except Exception as exc: # pylint: disable=broad-except
print(f"HITL JSON coordinate loading error: {exc}")
self.json_coordinates = []

def _next_json_coordinate(self) -> None:
"""
Cycles to the next JSON coordinate and sets it as target position.
"""
if not self.json_coordinates:
return

coordinate = self.json_coordinates[self.current_coordinate_index]
latitude, longitude, altitude = coordinate[0], coordinate[1], coordinate[2]
self.set_target_position(latitude, longitude, altitude)
print(
f"HITL set JSON coordinate {self.current_coordinate_index + 1}/{len(self.json_coordinates)}: "
f"({latitude}, {longitude}, {altitude})"
)

# Cycle to next coordinate
self.current_coordinate_index = (self.current_coordinate_index + 1) % len(
self.json_coordinates
)

def set_target_position(self, latitude: float, longitude: float, altitude: float) -> None:
"""
Sets the target position manually (currently a fallback if Ardupilot target doesnt work).
Sets the target position manually

Args:
latitude: Latitude in degrees.
Expand All @@ -49,7 +112,7 @@ def set_target_position(self, latitude: float, longitude: float, altitude: float

def get_target_position(self) -> tuple[float, float, float]:
"""
Gets the target position from the Ardupilot target.
Gets the target position from the Ardupilot target if available.

Returns:
Target position as (latitude, longitude, altitude).
Expand All @@ -62,7 +125,7 @@ def get_target_position(self) -> tuple[float, float, float]:
)
except Exception as exc: # pylint: disable=broad-except
print(f"HITL get_target_position recv_match error: {exc}")
position_target = None
return self.target_position
# pylint: enable=protected-access

if position_target:
Expand All @@ -78,10 +141,21 @@ def get_target_position(self) -> tuple[float, float, float]:

def periodic(self) -> None:
"""
Periodic function.
Periodic function. Updates target position either from JSON coordinates or Ardupilot.
"""

self.target_position = self.get_target_position()
now = time.time()

if self.json_coordinates:
# JSON has priority
if now >= self.next_coordinate_time:
try:
self._next_json_coordinate()
except Exception as exc: # pylint: disable=broad-except
print(f"HITL JSON coordinate update error: {exc}")
self.next_coordinate_time = now + self.update_interval
else:
# Fallback: use Ardupilot position target
self.target_position = self.get_target_position()

self.inject_position(
self.target_position[0], self.target_position[1], self.target_position[2]
Expand Down
10 changes: 9 additions & 1 deletion modules/mavlink/flight_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ def create(
position_module: bool = False,
camera_module: bool = False,
images_path: str | None = None,
position_json_path: str | None = None,
position_update_interval: float = 1.0,
) -> "tuple[bool, FlightController | None]":
"""
address: TCP address or serial port of the drone (e.g. "tcp:127.0.0.1:14550").
Expand All @@ -227,7 +229,13 @@ def create(
)
# Enable/disable HITL based on mode
success, hitl_instance = hitl_base.HITL.create(
drone, hitl_enabled, position_module, camera_module, images_path
drone,
hitl_enabled,
position_module,
camera_module,
images_path,
position_json_path,
position_update_interval,
)
if success:
if hitl_enabled and hitl_instance is not None:
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/hitl/test_coordinates.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
[43.43405014107003, -80.57898027451816, 373.0],
[40.0, -40.0, 200.0],
[41.29129039399329, -81.78471782918818, 373.0]
]
Loading