|
| 1 | +""" |
| 2 | +dijkstra_path_planning.py |
| 3 | +
|
| 4 | +Author: Ashish Varma |
| 5 | +""" |
| 6 | + |
| 7 | +import numpy as np |
| 8 | +import matplotlib.pyplot as plt |
| 9 | +import heapq |
| 10 | +import matplotlib.animation as anm |
| 11 | +import numpy as np |
| 12 | +import sys |
| 13 | +from pathlib import Path |
| 14 | +from matplotlib.colors import ListedColormap |
| 15 | + |
| 16 | +abs_dir_path = str(Path(__file__).absolute().parent) |
| 17 | +relative_path = "/../../../components/" |
| 18 | +relative_simulations = "/../../../simulations/" |
| 19 | + |
| 20 | + |
| 21 | +sys.path.append(abs_dir_path + relative_path + "visualization") |
| 22 | +sys.path.append(abs_dir_path + relative_path + "state") |
| 23 | +sys.path.append(abs_dir_path + relative_path + "obstacle") |
| 24 | +sys.path.append(abs_dir_path + relative_path + "plan/dijkstra") |
| 25 | +sys.path.append(abs_dir_path + relative_path + "mapping/grid") |
| 26 | + |
| 27 | + |
| 28 | + |
| 29 | + |
| 30 | +from state import State |
| 31 | +from obstacle import Obstacle |
| 32 | +from obstacle_list import ObstacleList |
| 33 | +from binary_occupancy_grid import BinaryOccupancyGrid |
| 34 | +from min_max import MinMax |
| 35 | +import json |
| 36 | + |
| 37 | + |
| 38 | + |
| 39 | + |
| 40 | + |
| 41 | +class DijkstraPathPlanner: |
| 42 | + def __init__(self, start, goal, map_file, x_lim=None, y_lim=None, path_filename=None, gif_name=None): |
| 43 | + """ |
| 44 | + Initialize the Dijkstra planner. |
| 45 | + Args: |
| 46 | + start: (x, y) tuple for start position. |
| 47 | + goal: (x, y) tuple for goal position. |
| 48 | + obstacle_parameters: List of obstacle dictionaries. |
| 49 | + resolution: Grid resolution in meters. |
| 50 | + visualize: Boolean to enable visualization during the search. |
| 51 | + x_lim: (min, max) tuple for x-axis range of the grid. |
| 52 | + y_lim: (min, max) tuple for y-axis range of the grid. |
| 53 | + """ |
| 54 | + self.start = start |
| 55 | + self.goal = goal |
| 56 | + self.explored_nodes = [] |
| 57 | + self.grid = self.load_grid_from_file(map_file) |
| 58 | + x_min, x_max = x_lim.min_value(), x_lim.max_value() |
| 59 | + y_min, y_max = y_lim.min_value(), y_lim.max_value() |
| 60 | + self.resolution = (x_max - x_min) / self.grid.shape[1] # Width of each cell |
| 61 | + self.x_range = np.arange(x_min, x_max, self.resolution) |
| 62 | + self.y_range = np.arange(y_min, y_max, self.resolution) |
| 63 | + self.path = [] |
| 64 | + self.path_filename = path_filename |
| 65 | + self.search() |
| 66 | + self.visualize_search(gif_name) |
| 67 | + |
| 68 | + def load_grid_from_file(self, file_path): |
| 69 | + """ |
| 70 | + Load a grid from a file and convert it to a numpy array. |
| 71 | + Args: |
| 72 | + file_path: Path to the file containing the grid data. |
| 73 | + Returns: |
| 74 | + grid: A numpy array representing the grid. |
| 75 | + """ |
| 76 | + file_extension = Path(file_path).suffix |
| 77 | + |
| 78 | + if file_extension == '.npy': |
| 79 | + grid = np.load(file_path) |
| 80 | + elif file_extension == '.png': |
| 81 | + grid = plt.imread(file_path) |
| 82 | + if grid.ndim == 3: # If the image has color channels, convert to grayscale |
| 83 | + grid = np.mean(grid, axis=2) |
| 84 | + grid = (grid > 0.5).astype(int) # Binarize the image |
| 85 | + elif file_extension == '.json': |
| 86 | + with open(file_path, 'r') as f: |
| 87 | + grid_data = json.load(f) |
| 88 | + grid = np.array(grid_data) |
| 89 | + else: |
| 90 | + raise ValueError(f"Unsupported file format: {file_extension}") |
| 91 | + |
| 92 | + return grid |
| 93 | + |
| 94 | + def is_valid(self, x, y): |
| 95 | + """ |
| 96 | + Check if a grid cell is within bounds and not an obstacle. |
| 97 | + Converts world coordinates to grid indices, accounting for negative min values. |
| 98 | + """ |
| 99 | + # Check if indices are within bounds and not an obstacle |
| 100 | + return (0 <= x < self.grid.shape[1] and |
| 101 | + 0 <= y < self.grid.shape[0] and |
| 102 | + self.grid[y, x] == 0) |
| 103 | + |
| 104 | + def search(self): |
| 105 | + start_idx = (int((self.start[0] - self.x_range[0]) /self.resolution), |
| 106 | + int((self.start[1] - self.y_range[0]) /self.resolution)) |
| 107 | + goal_idx = (int((self.goal[0] - self.x_range[0]) /self.resolution), |
| 108 | + int((self.goal[1] - self.y_range[0]) /self.resolution)) |
| 109 | + |
| 110 | + open_list = [] |
| 111 | + heapq.heappush(open_list, (0, start_idx)) |
| 112 | + came_from = {} |
| 113 | + cost_so_far = {start_idx: 0} |
| 114 | + |
| 115 | + print(f"Start: {start_idx}, Goal: {goal_idx}") |
| 116 | + while open_list: |
| 117 | + _, current = heapq.heappop(open_list) |
| 118 | + self.explored_nodes.append(current) |
| 119 | + if current == goal_idx: |
| 120 | + print(f"Goal found at: {current}") |
| 121 | + self.path = self.reconstruct_path(came_from, start_idx, goal_idx) |
| 122 | + sparse_path = self.make_sparse_path(self.path) |
| 123 | + self.save_path(sparse_path, self.path_filename) |
| 124 | + return |
| 125 | + |
| 126 | + for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1),(1, 1), (-1, -1), (1, -1), (-1, 1)]: |
| 127 | + neighbor = (current[0] + dx, current[1] + dy) |
| 128 | + # print(f"Neighbor: {neighbor}") |
| 129 | + if self.is_valid(neighbor[0], neighbor[1]): |
| 130 | + new_cost = cost_so_far[current] + ((abs(neighbor[0] - current[0]) **2) + (abs(neighbor[1] - current[1]) **2)) ** 0.5 |
| 131 | + if neighbor not in cost_so_far or new_cost < cost_so_far[neighbor]: |
| 132 | + cost_so_far[neighbor] = new_cost |
| 133 | + priority = new_cost |
| 134 | + heapq.heappush(open_list, (priority, neighbor)) |
| 135 | + came_from[neighbor] = current |
| 136 | + |
| 137 | + return [] |
| 138 | + |
| 139 | + def reconstruct_path(self, came_from, start, goal): |
| 140 | + """ |
| 141 | + Reconstruct the path from start to goal in world coordinates. |
| 142 | + Args: |
| 143 | + came_from: Dictionary containing the parent of each node. |
| 144 | + start: Start node in grid indices. |
| 145 | + goal: Goal node in grid indices. |
| 146 | + Returns: |
| 147 | + path: List of (x, y) tuples in world coordinates. |
| 148 | + """ |
| 149 | + current = goal |
| 150 | + path = [] |
| 151 | + while current != start: |
| 152 | + path.append(current) # Convert grid indices to world coordinates |
| 153 | + current = came_from[current] |
| 154 | + path.append(start) # Add the start node in world coordinates |
| 155 | + return path[::-1] # Reverse the path |
| 156 | + |
| 157 | + |
| 158 | + def _grid_to_world(self, grid_node): |
| 159 | + """ |
| 160 | + Convert grid indices to world coordinates. |
| 161 | + Args: |
| 162 | + grid_node: (grid_x, grid_y) tuple in grid indices. |
| 163 | + Returns: |
| 164 | + (world_x, world_y): Corresponding world coordinates. |
| 165 | + """ |
| 166 | + grid_x, grid_y = grid_node |
| 167 | + world_x = self.x_range[0] + grid_x *self.resolution |
| 168 | + world_y = self.y_range[0] + grid_y *self.resolution |
| 169 | + return (world_x, world_y) |
| 170 | + |
| 171 | + def make_sparse_path(self, path, num_points=20): |
| 172 | + """ |
| 173 | + Make the path sparse for use with CubicSplineCourse. |
| 174 | + Args: |
| 175 | + path: Full path as a list of (x, y) tuples in world coordinates. |
| 176 | + num_points: Number of points to include in the sparse path. |
| 177 | + Returns: |
| 178 | + sparse_path: A sparse path with evenly spaced world coordinates. |
| 179 | + """ |
| 180 | + if len(path) <= num_points: |
| 181 | + # If the path already has fewer points than num_points, return as-is |
| 182 | + return path |
| 183 | + |
| 184 | + # Use linear spacing to select points |
| 185 | + indices = np.linspace(0, len(path) - 1, num_points, dtype=int) |
| 186 | + sparse_path = [self._grid_to_world(path[i]) for i in indices] |
| 187 | + return sparse_path |
| 188 | + |
| 189 | + def save_path(self, path, filename): |
| 190 | + |
| 191 | + """Save path to a json file.""" |
| 192 | + if not Path(filename).exists(): |
| 193 | + Path(filename).touch() |
| 194 | + path = [node for node in path] |
| 195 | + with open(filename, "w") as f: |
| 196 | + json.dump(path, f) |
| 197 | + |
| 198 | + |
| 199 | + def visualize_search(self, gif_name=None): |
| 200 | + print(f"Exploring {len(self.explored_nodes)} nodes.") |
| 201 | + if not self.explored_nodes: |
| 202 | + print("Error: No explored nodes. Ensure search() is executed before visualize_search().") |
| 203 | + return |
| 204 | + |
| 205 | + |
| 206 | + figure = plt.figure(figsize=(10, 8)) |
| 207 | + axes = figure.add_subplot(111) |
| 208 | + axes.set_aspect("equal") |
| 209 | + axes.set_xlabel("X [m]", fontsize=15) |
| 210 | + axes.set_ylabel("Y [m]", fontsize=15) |
| 211 | + |
| 212 | + |
| 213 | + self.anime = anm.FuncAnimation( |
| 214 | + figure, |
| 215 | + self.update_frame, |
| 216 | + fargs=(axes, self.path), |
| 217 | + frames=len(self.explored_nodes) + len(self.path), # Include frames for the path |
| 218 | + interval=50, |
| 219 | + repeat=False, |
| 220 | + ) |
| 221 | + |
| 222 | + if gif_name is not None: |
| 223 | + try: |
| 224 | + print("Saving animation...") |
| 225 | + self.anime.save(gif_name, writer="pillow") |
| 226 | + print("Animation saved successfully.") |
| 227 | + except Exception as e: |
| 228 | + print(f"Error saving animation: {e}") |
| 229 | + else: |
| 230 | + plt.show() |
| 231 | + |
| 232 | + # clear existing plot and close existing figure |
| 233 | + plt.clf() |
| 234 | + plt.close() |
| 235 | + |
| 236 | + |
| 237 | + def update_frame(self, i, axes, path): |
| 238 | + """ |
| 239 | + Update frame for visualization using cell filling, including path reconstruction. |
| 240 | + Args: |
| 241 | + i: Current frame index. |
| 242 | + axes: Matplotlib axes to draw on. |
| 243 | + path: The reconstructed path to draw after exploration. |
| 244 | + """ |
| 245 | + # Exploration phase |
| 246 | + if i < len(self.explored_nodes): |
| 247 | + # Mark the current node as explored |
| 248 | + node = self.explored_nodes[i] |
| 249 | + grid_x = int(node[0]) |
| 250 | + grid_y = int(node[1]) |
| 251 | + self.grid[grid_y, grid_x] = 0.25 # Set a value to represent explored nodes |
| 252 | + |
| 253 | + # Path reconstruction phase |
| 254 | + else: |
| 255 | + path_index = i - len(self.explored_nodes) |
| 256 | + if path_index < len(path): |
| 257 | + node = path[path_index] |
| 258 | + grid_x = int(node[0]) |
| 259 | + grid_y = int(node[1]) |
| 260 | + self.grid[grid_y, grid_x] = 0.5 # Set a value to represent the path |
| 261 | + |
| 262 | + # Clear the axes and redraw the updated grid |
| 263 | + axes.clear() |
| 264 | + |
| 265 | + # Define RGB colors for each grid value |
| 266 | + # Colors in the format [R, G, B], where values are in the range [0, 1] |
| 267 | + colors = [ |
| 268 | + [1.0, 1.0, 1.0], # Free space (white) |
| 269 | + [0.4, 0.8, 1.0], # Explored nodes (light blue) |
| 270 | + [0.0, 1.0, 0.0], # Path (green) |
| 271 | + [0.5, 0.5, 0.5], # Clearance space (yellow-orange) |
| 272 | + [0.0, 0.0, 0.0], # Obstacles (red) |
| 273 | + ] |
| 274 | + |
| 275 | + # Create a colormap |
| 276 | + custom_cmap = ListedColormap(colors) |
| 277 | + |
| 278 | + |
| 279 | + axes.imshow(self.grid, extent=[self.x_range[0], self.x_range[-1], self.y_range[0], self.y_range[-1]], |
| 280 | + origin='lower', cmap=custom_cmap, alpha=0.8) |
| 281 | + axes.plot(self.start[0], self.start[1], 'go', label="Start") |
| 282 | + axes.plot(self.goal[0], self.goal[1], 'ro', label="Goal") |
| 283 | + axes.legend() |
| 284 | + |
| 285 | + |
| 286 | +if __name__ == "__main__": |
| 287 | + |
| 288 | + # The path to the map file where the planner will search for a path |
| 289 | + map_file = "map.json" |
| 290 | + # Define the path file to save the path that is generated by the planner |
| 291 | + path_file = "path.json" |
| 292 | + # Visualize the search process and save the gif |
| 293 | + gif_path = "dijkstra_search.gif" |
| 294 | + |
| 295 | + x_lim, y_lim = MinMax(-5, 55), MinMax(-20, 25) |
| 296 | + |
| 297 | + # Define the start and goal positions |
| 298 | + start = (0, 0) |
| 299 | + goal = (50, -10) |
| 300 | + |
| 301 | + # Create the Dijkstra planner |
| 302 | + planner = DijkstraPathPlanner(start, goal, map_file, x_lim=x_lim, y_lim=y_lim, path_filename=path_file, gif_name=gif_path) |
| 303 | + |
| 304 | + |
0 commit comments