Skip to content

Commit 1e17d51

Browse files
authored
[Route Planner Agent] New websocket connection to smart traffic agent (open-edge-platform#2007)
Signed-off-by: Krishna Murti <krishna.murti@intel.com>
1 parent 98aa6d5 commit 1e17d51

File tree

15 files changed

+845
-768
lines changed

15 files changed

+845
-768
lines changed

metro-ai-suite/smart-route-planning-agent/setup.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ if [ "$#" -eq 2 ]; then
7373
fi
7474

7575

76-
7776
# Base configuration
7877
HOST_IP=$(ip route get 1 2>/dev/null | awk '{print $7}') # Fetch the host IP
7978

metro-ai-suite/smart-route-planning-agent/src/agents/route_planner.py

Lines changed: 114 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
StaticRouteOptimizerFactory,
2222
ThresholdController,
2323
)
24-
from schema import LiveTrafficData, RouteCondition
2524
from utils.gpx_parser import MapDataParser
2625
from utils.helper import get_all_available_route_files as route_files
2726
from utils.logging_config import get_logger
27+
from schema import LiveTrafficData
28+
2829

2930
logger = get_logger(__name__)
3031

@@ -46,7 +47,7 @@ def __init__(self):
4647

4748
self.live_traffic_status_list: list[LiveTrafficState] = []
4849

49-
def _find_new_shortest_available_route(
50+
async def _find_new_shortest_available_route(
5051
self, source: str, destination: str, no_fly_list: list[str]
5152
) -> tuple[str, float]:
5253
"""
@@ -57,38 +58,52 @@ def _find_new_shortest_available_route(
5758
shortest_distance: float = 0.0
5859
shortest_route: str = ""
5960

60-
# Iterate over all available route files not present in no_fly_list
61-
for route_file in list(set(self.all_routes) - set(no_fly_list or [])):
62-
# Parse GPX file for current route
63-
temp_parser = MapDataParser(GPX_DIR / route_file)
64-
waypoints = temp_parser.get_waypoints()
65-
66-
# Get source and destination waypoints
67-
source_wpt = waypoints[0] if waypoints else None
68-
destination_wpt = waypoints[-1] if waypoints else None
69-
70-
# Check if waypoints match the source and destination in graph state
71-
if source_wpt and destination_wpt:
72-
if (
73-
source_wpt["name"] == source or source_wpt["description"] == source
74-
) and (
75-
destination_wpt["name"] == destination
76-
or destination_wpt["description"] == destination
77-
):
78-
# Get the route with shortest distance for given source and destination
79-
route_distance = temp_parser.get_total_distance()
80-
if route_distance < shortest_distance or shortest_distance == 0.0:
81-
shortest_distance = route_distance
82-
shortest_route = route_file
61+
try:
62+
# Iterate over all available route files not present in no_fly_list
63+
for route_file in list(set(self.all_routes) - set(no_fly_list or [])):
64+
# Parse GPX file for current route
65+
logger.debug(
66+
f"Checking route file: {route_file} for source: {source} and destination: {destination}"
67+
)
68+
temp_parser = await MapDataParser.create(GPX_DIR / route_file)
69+
waypoints = temp_parser.get_waypoints()
70+
71+
# Get source and destination waypoints
72+
source_wpt = waypoints[0] if waypoints else None
73+
destination_wpt = waypoints[-1] if waypoints else None
74+
75+
# Check if waypoints match the source and destination in graph state
76+
if source_wpt and destination_wpt:
77+
if (
78+
source_wpt["name"] == source
79+
or source_wpt["description"] == source
80+
) and (
81+
destination_wpt["name"] == destination
82+
or destination_wpt["description"] == destination
83+
):
84+
# Get the route with shortest distance for given source and destination
85+
route_distance = temp_parser.get_total_distance()
86+
if (
87+
route_distance < shortest_distance
88+
or shortest_distance == 0.0
89+
):
90+
shortest_distance = route_distance
91+
shortest_route = route_file
92+
finally:
93+
if "temp_parser" in locals():
94+
temp_parser.clean() # Clean up parser instance to free memory
8395

8496
return shortest_route, shortest_distance
8597

86-
def find_direct_route(self, state: State) -> State:
98+
async def find_direct_route(self, state: State) -> State:
8799
"""Finds the direct route based on the available routes and provided source/destination."""
88100

89101
logger.info("Finding direct shortest route ...")
90-
logger.debug(f"============= State of the state : {state} =============")
91-
shortest_route, shortest_distance = self._find_new_shortest_available_route(
102+
logger.debug(f"State of the state : {state}")
103+
(
104+
shortest_route,
105+
shortest_distance,
106+
) = await self._find_new_shortest_available_route(
92107
state.get("source", ""),
93108
state.get("destination", ""),
94109
state.get("no_fly_list", IGNORED_ROUTES),
@@ -113,13 +128,14 @@ def find_direct_route(self, state: State) -> State:
113128
"no_fly_list": [*IGNORED_ROUTES],
114129
}
115130

116-
def find_optimal_route(self, state: State) -> State:
131+
# NOTE: This tool is NOT being utilized right now.
132+
async def find_optimal_route(self, state: State) -> State:
117133
"""
118134
Finds the optimal route based on the available route status and information.
119135
#TODO Uses Brute Force Search - Need to be Improved.
120136
"""
121137
logger.info("Finding optimal routes based on static data ...")
122-
route_status: RouteCondition | None = None
138+
route_status = None
123139

124140
static_optimizers = state.get("static_optimizers")
125141
if static_optimizers:
@@ -148,8 +164,11 @@ def find_optimal_route(self, state: State) -> State:
148164
"distance": optimal_distance,
149165
}
150166

151-
temp_parser = MapDataParser(GPX_DIR / optimal_route_name)
152-
route_data = temp_parser.get_route_data()
167+
try:
168+
temp_parser = await MapDataParser.create(GPX_DIR / optimal_route_name)
169+
route_data = temp_parser.get_route_data()
170+
finally:
171+
temp_parser.clean() # Clean up parser instance to free memory
153172

154173
for track in route_data["tracks"]:
155174
for track_point in track["track_points"]:
@@ -167,12 +186,13 @@ def find_optimal_route(self, state: State) -> State:
167186
# check if route_status has a required attributes and proceed accordingly
168187
if hasattr(route_status, "weather_condition"):
169188
if route_status.weather_condition in ADVERSE_WEATHER_CONDITIONS:
170-
optimal_route_name, optimal_distance = (
171-
self._find_new_shortest_available_route(
172-
state.get("source", ""),
173-
state.get("destination", ""),
174-
state.get("no_fly_list", []),
175-
)
189+
(
190+
optimal_route_name,
191+
optimal_distance,
192+
) = await self._find_new_shortest_available_route(
193+
state.get("source", ""),
194+
state.get("destination", ""),
195+
state.get("no_fly_list", []),
176196
)
177197
optimal_route_state = {
178198
"route_name": optimal_route_name,
@@ -185,12 +205,13 @@ def find_optimal_route(self, state: State) -> State:
185205
CongestionLevel.HIGH,
186206
CongestionLevel.SEVERE,
187207
]:
188-
optimal_route_name, optimal_distance = (
189-
self._find_new_shortest_available_route(
190-
state.get("source", ""),
191-
state.get("destination", ""),
192-
state.get("no_fly_list", []),
193-
)
208+
(
209+
optimal_route_name,
210+
optimal_distance,
211+
) = await self._find_new_shortest_available_route(
212+
state.get("source", ""),
213+
state.get("destination", ""),
214+
state.get("no_fly_list", []),
194215
)
195216
optimal_route_state = {
196217
"route_name": optimal_route_name,
@@ -208,7 +229,7 @@ def find_optimal_route(self, state: State) -> State:
208229
"no_fly_list": [optimal_route_name] if optimal_route_name else [],
209230
}
210231

211-
def update_optimal_route_realtime(self, state: State) -> State:
232+
async def update_optimal_route_realtime(self, state: State) -> State:
212233
"""Updates the optimal route in real-time based on live traffic data."""
213234

214235
logger.info(
@@ -237,31 +258,45 @@ def update_optimal_route_realtime(self, state: State) -> State:
237258

238259
# fetch the available live traffic data
239260
live_traffic_controller = LiveTrafficController()
240-
all_routes_data: List[LiveTrafficData] = (
241-
live_traffic_controller.fetch_route_status()
242-
)
261+
all_routes_data: List[
262+
LiveTrafficData
263+
] = await live_traffic_controller.fetch_route_status()
264+
265+
logger.debug(f"Live traffic data received: {all_routes_data}")
243266

244267
# Iterate till no new routes are available
245268
while True:
246269
route_not_optimal: bool = False
247270
logger.debug(f"Roads not to be taken : {local_no_fly_list}")
248271

249272
# Get next available shortest route
250-
next_shortest_route_name, next_shortest_distance = (
251-
self._find_new_shortest_available_route(
252-
state.get("source", ""),
253-
state.get("destination", ""),
254-
local_no_fly_list,
255-
)
273+
(
274+
next_shortest_route_name,
275+
next_shortest_distance,
276+
) = await self._find_new_shortest_available_route(
277+
state.get("source", ""),
278+
state.get("destination", ""),
279+
local_no_fly_list,
280+
)
281+
282+
route_found_in_live_traffic: bool = False # Simple flag to check if current route being iterated is present in live traffic data
283+
logger.debug(
284+
f"Next shortest route: {next_shortest_route_name} with distance: {next_shortest_distance}"
256285
)
257286

258287
if not next_shortest_route_name or not next_shortest_distance:
259288
logger.info("No more alternate routes available.")
260289
break
261290

262291
# Parse the next available shortest route
263-
map_parser = MapDataParser(GPX_DIR / next_shortest_route_name)
264-
route_data = map_parser.get_route_data()
292+
try:
293+
# TODO: _find_next_shortest_avaialable_route already parses the GPX file, can be utilized here instead of parsing again.
294+
map_parser = await MapDataParser.create(
295+
GPX_DIR / next_shortest_route_name
296+
)
297+
route_data = map_parser.get_route_data()
298+
finally:
299+
map_parser.clean() # Clean up parser instance to free memory
265300

266301
# Get the waypoints and first track and collect all trackpoints for the track
267302
trackpoints = route_data.get("waypoints", [])
@@ -290,6 +325,7 @@ def update_optimal_route_realtime(self, state: State) -> State:
290325
)
291326
<= live_traffic_controller.proximity_factor
292327
):
328+
route_found_in_live_traffic = True
293329
if (
294330
traffic_status.traffic_density
295331
> ThresholdController.TRAFFIC_DENSITY_THRESHOLD
@@ -337,7 +373,11 @@ def update_optimal_route_realtime(self, state: State) -> State:
337373
)
338374
break
339375

340-
if i == len(trackpoints) - 1 and not route_not_optimal:
376+
if (
377+
route_found_in_live_traffic
378+
and i == len(trackpoints) - 1
379+
and not route_not_optimal
380+
):
341381
# If we reached the last trackpoint without finding high traffic, consider route to be optimal
342382
logger.info(f"Route {next_shortest_route_name} is optimal.")
343383

@@ -425,19 +465,18 @@ def _build_graph(self) -> CompiledStateGraph:
425465

426466
# Add final edges from all three nodes to END node
427467
self.graph.add_edge(PlannerNode.DIRECT.value, END)
428-
# Add conditional edge between optimal_route and END node, as we need to re-run this node until
429-
# the static route optimizer stack exhausts.
468+
# Add conditional edge between optimal_route and END node, as we need to re-run this node for all available static optimizers.
430469
self.graph.add_conditional_edges(
431470
PlannerNode.OPTIMAL.value,
432471
self._should_rerun_static_route_optimizers,
433-
{PlannerNode.OPTIMAL.value, END},
472+
{True: PlannerNode.OPTIMAL.value, False: END},
434473
)
435474
self.graph.add_edge(PlannerNode.REALTIME.value, END)
436475

437476
# Compile the graph to be able to execute it
438477
return self.graph.compile()
439478

440-
def plan_route(
479+
async def plan_route(
441480
self, source: str, destination: str, previous_state: Optional[State] = None
442481
) -> State:
443482
"""
@@ -460,6 +499,19 @@ def plan_route(
460499
current_state = {**current_state, **previous_state}
461500

462501
# Execute the graph to find the best route
463-
route_detail = self.compiled_graph.invoke(current_state)
502+
route_detail = await self.compiled_graph.ainvoke(current_state)
503+
504+
# Parse route_detail as RoutePlannerState to be returned
505+
route_detail = State(
506+
source=route_detail.get("source", ""),
507+
destination=route_detail.get("destination", ""),
508+
no_fly_list=route_detail.get("no_fly_list", []),
509+
direct_route=route_detail.get("direct_route", {}),
510+
optimal_route=route_detail.get("optimal_route", {}),
511+
static_optimizers=route_detail.get("static_optimizers", []),
512+
live_traffic=route_detail.get("live_traffic", {}),
513+
is_sub_optimal=route_detail.get("is_sub_optimal", False),
514+
all_routes_data=route_detail.get("all_routes_data", []),
515+
)
464516

465517
return route_detail

metro-ai-suite/smart-route-planning-agent/src/config.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@
77
# Configs and Constants for the Route Planner Application
88

99
# Default locations and coordinates
10-
DEFAULT_LOCATIONS = [
11-
"Berkeley, California",
12-
"Santa Clara, California",
10+
LOCATION_PAIRS = [
11+
["Berkeley, California", "Santa Clara, California"],
1312
]
1413

1514
# Coordinates mapping for locations with default values for lats and long
@@ -144,4 +143,6 @@ class PlannerNode(Enum):
144143
3. Continously monitor the live traffic conditions on the optimized route and update the route as needed in real-time.
145144
"""
146145

147-
INITIAL_MAP_HTML = "<div style='text-align: center; padding: 50px; font-size: 18px; color: #666;'>Select locations and click 'Find Route' to see the route map</div>"
146+
INITIAL_MAP_HTML = (
147+
"<div class='no-map-style'>Select locations to see the route map</div>"
148+
)

0 commit comments

Comments
 (0)