22Emulates position and attitude to Pixhawk.
33"""
44
5+ import json
56import time
67from ..mavlink import dronekit
78
@@ -15,18 +16,32 @@ class PositionEmulator:
1516
1617 @classmethod
1718 def create (
18- cls , drone : dronekit .Vehicle
19+ cls ,
20+ drone : dronekit .Vehicle ,
21+ json_file_path : str | None = None ,
22+ update_interval : float = 1.0 ,
1923 ) -> "tuple[True, PositionEmulator] | tuple[False, None]" :
2024 """
2125 Setup position emulator.
2226
27+ Args:
28+ drone: Dronekit vehicle instance.
29+ json_file_path: Optional path to JSON file containing coordinates.
30+ update_interval: Interval (seconds) between switching JSON coordinates.
31+
2332 Returns:
2433 Success, PositionEmulator instance.
2534 """
2635
27- return True , PositionEmulator (cls .__create_key , drone )
36+ return True , PositionEmulator (cls .__create_key , drone , json_file_path , update_interval )
2837
29- def __init__ (self , class_private_create_key : object , drone : dronekit .Vehicle ) -> None :
38+ def __init__ (
39+ self ,
40+ class_private_create_key : object ,
41+ drone : dronekit .Vehicle ,
42+ json_file_path : str | None = None ,
43+ update_interval : float = 1.0 ,
44+ ) -> None :
3045 """
3146 Private constructor, use create() method.
3247 """
@@ -35,10 +50,58 @@ def __init__(self, class_private_create_key: object, drone: dronekit.Vehicle) ->
3550 self .target_position = (43.43405014107003 , - 80.57898027451816 , 373.0 ) # lat, lon, alt
3651
3752 self .drone = drone
53+ self .json_file_path = json_file_path
54+ self .json_coordinates : list [list [float ]] = []
55+ self .current_coordinate_index = 0
56+ self .update_interval = update_interval
57+ self .next_coordinate_time = time .time () + self .update_interval
58+
59+ if self .json_file_path :
60+ self ._load_json_coordinates ()
61+
62+ def _load_json_coordinates (self ) -> None :
63+ """
64+ Loads coordinates from the JSON file and validates them.
65+ """
66+ try :
67+ with open (self .json_file_path , "r" , encoding = "utf-8" ) as file :
68+ self .json_coordinates = json .load (file )
69+
70+ if not isinstance (self .json_coordinates , list ) or not all (
71+ isinstance (coord , list ) and len (coord ) >= 3 for coord in self .json_coordinates
72+ ):
73+ raise ValueError ("JSON must be a list of [lat, lon, alt] lists" )
74+
75+ print (
76+ f"HITL loaded { len (self .json_coordinates )} coordinates from { self .json_file_path } "
77+ )
78+ except Exception as exc : # pylint: disable=broad-except
79+ print (f"HITL JSON coordinate loading error: { exc } " )
80+ self .json_coordinates = []
81+
82+ def _next_json_coordinate (self ) -> None :
83+ """
84+ Cycles to the next JSON coordinate and sets it as target position.
85+ """
86+ if not self .json_coordinates :
87+ return
88+
89+ coordinate = self .json_coordinates [self .current_coordinate_index ]
90+ latitude , longitude , altitude = coordinate [0 ], coordinate [1 ], coordinate [2 ]
91+ self .set_target_position (latitude , longitude , altitude )
92+ print (
93+ f"HITL set JSON coordinate { self .current_coordinate_index + 1 } /{ len (self .json_coordinates )} : "
94+ f"({ latitude } , { longitude } , { altitude } )"
95+ )
96+
97+ # Cycle to next coordinate
98+ self .current_coordinate_index = (self .current_coordinate_index + 1 ) % len (
99+ self .json_coordinates
100+ )
38101
39102 def set_target_position (self , latitude : float , longitude : float , altitude : float ) -> None :
40103 """
41- Sets the target position manually (currently a fallback if Ardupilot target doesnt work).
104+ Sets the target position manually
42105
43106 Args:
44107 latitude: Latitude in degrees.
@@ -49,7 +112,7 @@ def set_target_position(self, latitude: float, longitude: float, altitude: float
49112
50113 def get_target_position (self ) -> tuple [float , float , float ]:
51114 """
52- Gets the target position from the Ardupilot target.
115+ Gets the target position from the Ardupilot target if available .
53116
54117 Returns:
55118 Target position as (latitude, longitude, altitude).
@@ -62,7 +125,7 @@ def get_target_position(self) -> tuple[float, float, float]:
62125 )
63126 except Exception as exc : # pylint: disable=broad-except
64127 print (f"HITL get_target_position recv_match error: { exc } " )
65- position_target = None
128+ return self . target_position
66129 # pylint: enable=protected-access
67130
68131 if position_target :
@@ -78,10 +141,21 @@ def get_target_position(self) -> tuple[float, float, float]:
78141
79142 def periodic (self ) -> None :
80143 """
81- Periodic function.
144+ Periodic function. Updates target position either from JSON coordinates or Ardupilot.
82145 """
83-
84- self .target_position = self .get_target_position ()
146+ now = time .time ()
147+
148+ if self .json_coordinates :
149+ # JSON has priority
150+ if now >= self .next_coordinate_time :
151+ try :
152+ self ._next_json_coordinate ()
153+ except Exception as exc : # pylint: disable=broad-except
154+ print (f"HITL JSON coordinate update error: { exc } " )
155+ self .next_coordinate_time = now + self .update_interval
156+ else :
157+ # Fallback: use Ardupilot position target
158+ self .target_position = self .get_target_position ()
85159
86160 self .inject_position (
87161 self .target_position [0 ], self .target_position [1 ], self .target_position [2 ]
0 commit comments