Skip to content

[Traffic Intersection Agent] Websockets implementation for pushing updates in real-time#1815

Open
krish918 wants to merge 9 commits intoopen-edge-platform:mainfrom
krish918:trafficagent/websocket
Open

[Traffic Intersection Agent] Websockets implementation for pushing updates in real-time#1815
krish918 wants to merge 9 commits intoopen-edge-platform:mainfrom
krish918:trafficagent/websocket

Conversation

@krish918
Copy link
Contributor

Description

This PR introduces Websockets in Smart Traffic Intersection Agent, for pushing updates to clients in real-time (as soon as they are available).

Following changes have been introduced to implement this feature:

API Server Updates

  • New /api/v1/traffic/current/ws endpoint based on websocket protocol
  • DataAggregator service updates for creating and emitting an asyncio Event
    • Helping emit event as soon as data is available
  • Minor refactoring in DataAggregator, VLMService for handling types and async calls
  • Updated max size of websocket connection to allow transport of large base64 images

UI Updates

  • Websocket client implementations with max size increased to same value as server - for allowing large base64 images
  • Added gradios interface queue for concurrent requests handling
  • Removed UI refresh and related components
  • Added two new core methods for handling Websocket push messages:
    • fetch_intersection_data to connect to websocket endpoint
    • update_components to update UI based on data response from websocket
  • Refactored data_loader module to implement above two functions
  • Refactored UIComponents to make all methods async
  • Other Minor refactoring

Fixes # (issue)

Any Newly Introduced Dependencies

websockets 15.0.1 https://github.com/python-websockets/websockets/blob/main/LICENSE

How Has This Been Tested?

end-to-end functional validation done.

Checklist:

  • I agree to use the APACHE-2.0 license for my code changes.
  • I have not introduced any 3rd party components incompatible with APACHE-2.0.
  • I have not included any company confidential information, trade secret, password or security token.
  • I have performed a self-review of my code.

- Added more type hints.
- Removed Optional NoneType from several fields.
	- For type compatibility at several places in code
- Minor refactoring/formatting in some file
- Update WeatherService for new logic to use cached data and forcing cache invalidation
	- Was done to avoid type incompatibility

Signed-off-by: Krishna Murti <krishna.murti@intel.com>
- New /api/v1/traffic/current/ws endpoint based on websocket protocol
	- Mimics the functionality of /api/v1/traffic/current REST API
- Updated DataAggregator service for creating and emitting an asyncio Event
	- emits events once vlm analysis is available.
- A event loop in websocket endpoint listens for the event and sends response once event notif is receieved.
- Minor refactoring in DataAggregator for handling types
- Minor refactoring in VLMService for remove non-existent WeatherTypes

Signed-off-by: Krishna Murti <krishna.murti@intel.com>
- websocket endpoint's images query param to have default value True
- Updated REST API and websocket's images query param to use Annotated
- Updated max size of websocket connection to allow transport of large base64 images
- Protected type casting of env var's value with 'or' operator
	- for cases when env var value is empty '' and int('') will fail

Signed-off-by: Krishna Murti <krishna.murti@intel.com>
… scaling requests

- Added gradios queues for handling multiple concurrent requests
- Removed UI refresh and related components  in favor of websocket connection
- added two new async methods for fetching and handling traffic updates
	- fetch_intersection_data to connect to websocket endpoint
	- update_components to update UI based on data response from websocket
	- Both functions are run concurrently at UI load using interface.load()
- Refactored data loader to inmplement new functions
	- Websocket conn uses large max_size to handle large base64 images
	- Asyncio queue used to put websocket responses in it
	- update_components picks from the queue once available and updates the UI
		- by yielding as soon as new data in queue is available
- Added new websocket dependency in requirements.txt

Signed-off-by: Krishna Murti <krishna.murti@intel.com>
- Minor refactoring in UIComponents apart from amking all methods async
- Removed time format conversion method - 2 lines single use function
- removed int type casting - already int type from API response
- removed other unused functions

Signed-off-by: Krishna Murti <krishna.murti@intel.com>
@krish918 krish918 added the GenAI label Feb 18, 2026
return request.app.state.weather_service


def _build_response_dict(traffic_response: Any, weather_data: Any, include_images: bool) -> Dict[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring: Common method used for use with both REST API and Websocket endpoint.

weather_data = await weather_service.get_current_weather()

# Convert to dict for JSON response
response_dict = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed reusable code - to shift to common method.

@@ -129,7 +129,7 @@ async def get_current_weather(self, force_refresh: bool = False) -> Optional[Wea
"""
logger.info("Getting current weather data", force_refresh=force_refresh, has_cached=self._cached_weather is not None, use_mock=self.use_mock)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change here: to avoid type incompatibility warning. slight change in cache validity check based on that.

except Exception as e:
logger.error(f"Error parsing API response: {str(e)}")
return None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removal of un-necessary indirection.

continue

return image_list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

un-necessary unused method.

@krish918 krish918 marked this pull request as ready for review February 19, 2026 00:41
krish918 added 3 commits March 3, 2026 14:19
Signed-off-by: Krishna Murti <krishna.murti@intel.com>
Signed-off-by: Krishna Murti <krishna.murti@intel.com>
Signed-off-by: Krishna Murti <krishna.murti@intel.com>
markdown>=3.4.0
requests>=2.25.0 No newline at end of file
requests>=2.25.0
websockets>=15.0.0 No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can use the latest versions of these.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants