Skip to content

[Camera] Adds support for camera app yaml test cases through camera-controller #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: v2.13-develop
Choose a base branch
from
Open
16 changes: 13 additions & 3 deletions app/api/api_v1/sockets/web_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from fastapi import APIRouter, WebSocket
from fastapi.websockets import WebSocketDisconnect

from app.constants.websockets_constants import WebSocketConnection, WebSocketTypeEnum
from app.socket_connection_manager import SocketConnectionManager

router = APIRouter()
Expand All @@ -26,18 +27,27 @@
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
socket_connection_manager = SocketConnectionManager()
await socket_connection_manager.connect(websocket)
connection = WebSocketConnection(websocket, WebSocketTypeEnum.MAIN)
await socket_connection_manager.connect(connection)
try:
while True:
# WebSocketDisconnect is not raised unless we poll
# https://github.com/tiangolo/fastapi/issues/3008
try:
message = await asyncio.wait_for(websocket.receive_text(), 0.1)
await socket_connection_manager.received_message(
socket=websocket, message=message
websocket=websocket, message=message
)
except asyncio.TimeoutError:
pass

except WebSocketDisconnect:
socket_connection_manager.disconnect(websocket)
socket_connection_manager.disconnect(connection)


@router.websocket("/ws/video")
async def websocket_video_endpoint(websocket: WebSocket) -> None:
socket_connection_manager = SocketConnectionManager()
connection = WebSocketConnection(websocket, WebSocketTypeEnum.VIDEO)
await socket_connection_manager.connect(connection)
await socket_connection_manager.relay_video_frames(connection)
17 changes: 17 additions & 0 deletions app/constants/websockets_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
#
from enum import Enum

from fastapi import WebSocket

MESSAGE_ID_KEY = "message_id"

INVALID_JSON_ERROR_STR = "The message received is not a valid JSON object"
MISSING_TYPE_ERROR_STR = "The message is missing a type key"
NO_HANDLER_FOR_MSG_ERROR_STR = "There is no handler registered for this message type"

UDP_SOCKET_PORT = 5000
UDP_SOCKET_INTERFACE = "0.0.0.0"


# Enum Keys for different types of messages currently supported by the tool
class MessageTypeEnum(str, Enum):
Expand All @@ -33,6 +38,18 @@ class MessageTypeEnum(str, Enum):
TIME_OUT_NOTIFICATION = "time_out_notification"
TEST_LOG_RECORDS = "test_log_records"
INVALID_MESSAGE = "invalid_message"
STREAM_VERIFICATION_REQUEST = "stream_verification_request"


class WebSocketTypeEnum(str, Enum):
MAIN = "main"
VIDEO = "video"


class WebSocketConnection:
def __init__(self, websocket: WebSocket, socket_type: WebSocketTypeEnum) -> None:
self.websocket = websocket
self.type = socket_type


# Enum keys used with messages at the top level
Expand Down
108 changes: 75 additions & 33 deletions app/socket_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import json
import socket
from json import JSONDecodeError
from typing import Callable, Dict, List, Union

import pydantic
from fastapi import WebSocket
from fastapi.websockets import WebSocketDisconnect
from loguru import logger
from starlette.websockets import WebSocketState
from websockets.exceptions import ConnectionClosedOK
Expand All @@ -27,8 +30,12 @@
INVALID_JSON_ERROR_STR,
MISSING_TYPE_ERROR_STR,
NO_HANDLER_FOR_MSG_ERROR_STR,
UDP_SOCKET_INTERFACE,
UDP_SOCKET_PORT,
MessageKeysEnum,
MessageTypeEnum,
WebSocketConnection,
WebSocketTypeEnum,
)
from app.singleton import Singleton

Expand All @@ -43,21 +50,25 @@
# - Allows broadcasting as well sending personal messages to all or single client
class SocketConnectionManager(object, metaclass=Singleton):
def __init__(self) -> None:
self.active_connections: List[WebSocket] = []
self.active_connections: List[WebSocketConnection] = []
self.__message_handlers: Dict[MessageTypeEnum, SocketMessageHander] = {}

async def connect(self, websocket: WebSocket) -> None:
async def connect(self, connection: WebSocketConnection) -> None:
try:
websocket = connection.websocket
await websocket.accept()
logger.info(f'Websocket connected: "{websocket}".')
self.active_connections.append(websocket)
self.active_connections.append(connection)
except RuntimeError as e:
logger.info(f'Failed to connect with error: "{e}".')
raise e

def disconnect(self, websocket: WebSocket) -> None:
logger.info(f'Websocket disconnected: "{websocket}".')
self.active_connections.remove(websocket)
def disconnect(self, connection: WebSocketConnection) -> None:
logger.info(
f'Websocket disconnected: "{connection.websocket}"'
f' of type: "{connection.type}".'
)
self.active_connections.remove(connection)

async def send_personal_message(
self, message: Union[str, dict, list], websocket: WebSocket
Expand All @@ -72,31 +83,58 @@
if isinstance(message, dict) or isinstance(message, list):
message = json.dumps(message, default=pydantic.json.pydantic_encoder)
for connection in self.active_connections:
try:
await connection.send_text(message)
# Starlette raises websockets.exceptions.ConnectionClosedOK when trying to
# send to a closed socket. https://github.com/encode/starlette/issues/759
except ConnectionClosedOK:
if connection.application_state != WebSocketState.DISCONNECTED:
await connection.close()
logger.warning(
f'Failed to send message: "{message}" to socket: "{connection}",'
"connection closed."
)
except RuntimeError as e:
logger.warning(
f'Failed to send: "{message}" to socket: "{connection}."',
'Error:"{e}"',
)
raise e

async def received_message(self, socket: WebSocket, message: str) -> None:
if connection.type == WebSocketTypeEnum.MAIN:
websocket = connection.websocket
try:
await websocket.send_text(message)
# Starlette raises websockets.exceptions.ConnectionClosedOK
# when trying to send to a closed websocket.
# https://github.com/encode/starlette/issues/759
except ConnectionClosedOK:
if websocket.application_state != WebSocketState.DISCONNECTED:
await websocket.close()
logger.warning(
f'Failed to send message: "{message}"'
f' to websocket: "{websocket}", connection closed."')
except RuntimeError as e:
logger.warning(
f'Failed to send: "{message}" to websocket: "{websocket}."',
'Error:"{e}"',
)
raise e

async def received_message(self, websocket: WebSocket, message: str) -> None:
try:
json_dict = json.loads(message)
await self.__handle_received_json(socket, json_dict)
await self.__handle_received_json(websocket, json_dict)
except JSONDecodeError:
await self.__notify_invalid_message(
socket=socket, message=INVALID_JSON_ERROR_STR
websocket=websocket, message=INVALID_JSON_ERROR_STR
)

async def relay_video_frames(self, connection: WebSocketConnection):

Check failure on line 115 in app/socket_connection_manager.py

View workflow job for this annotation

GitHub Actions / Mypy

app/socket_connection_manager.py#L115

Function is missing a return type annotation [no-untyped-def]
if connection.type == WebSocketTypeEnum.VIDEO:
websocket = connection.websocket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
sock.bind((UDP_SOCKET_INTERFACE, UDP_SOCKET_PORT))
logger.info("UDP socket bound successfully")
loop = asyncio.get_event_loop()
while True:
data, _ = await loop.run_in_executor(None, sock.recvfrom, 65536)
for connection in self.active_connections:
if connection.type == WebSocketTypeEnum.VIDEO:
await connection.websocket.send_bytes(data)
except WebSocketDisconnect:
logger.error(f'Websocket for video stream disconnected: "{websocket}".')
except Exception as e:
logger.error(f"Failed with {e}")
finally:
await websocket.close()
self.active_connections.remove(connection)
else:
logger.error(
f'Excpected websocket connection of type {WebSocketTypeEnum.VIDEO}'
)

# Note: Currently we only support one message handler per type, registering the
Expand All @@ -106,31 +144,35 @@
) -> None:
self.__message_handlers[message_type] = callback

async def __handle_received_json(self, socket: WebSocket, json_dict: dict) -> None:
async def __handle_received_json(
self, websocket: WebSocket, json_dict: dict
) -> None:
message_type = json_dict[MessageKeysEnum.TYPE]
if message_type is None:
# Every message must have a type key for the tool to be able to route it
await self.__notify_invalid_message(
socket=socket, message=MISSING_TYPE_ERROR_STR
websocket=websocket, message=MISSING_TYPE_ERROR_STR
)
return

if message_type not in self.__message_handlers.keys():
# No handler registered for this type of message
await self.__notify_invalid_message(
socket=socket, message=NO_HANDLER_FOR_MSG_ERROR_STR
websocket=websocket, message=NO_HANDLER_FOR_MSG_ERROR_STR
)
return

message_handler = self.__message_handlers[message_type]
message_handler(json_dict[MessageKeysEnum.PAYLOAD], socket)
message_handler(json_dict[MessageKeysEnum.PAYLOAD], websocket)

async def __notify_invalid_message(self, socket: WebSocket, message: str) -> None:
async def __notify_invalid_message(
self, websocket: WebSocket, message: str
) -> None:
notify_message = {
MessageKeysEnum.TYPE: MessageTypeEnum.INVALID_MESSAGE,
MessageKeysEnum.PAYLOAD: message,
}
await self.send_personal_message(message=notify_message, websocket=socket)
await self.send_personal_message(message=notify_message, websocket=websocket)


socket_connection_manager = SocketConnectionManager()
47 changes: 47 additions & 0 deletions app/test_engine/models/manual_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from app.user_prompt_support import (
OptionsSelectPromptRequest,
PromptResponse,
StreamVerificationPromptRequest,
TextInputPromptRequest,
UploadFile,
UploadFilePromptRequest,
UserPromptSupport,
Expand Down Expand Up @@ -52,6 +54,51 @@ def __init__(self, name: str, verification: Optional[str] = None) -> None:
super().__init__(name=name)
self.verification = verification

async def prompt_verification_step_with_response(
self, textInputPrompt: TextInputPromptRequest
) -> str:
"""Prompt user to verify the video stream.

Returns:
str: string response received as user input
"""
prompt_response = await self.invoke_prompt_and_get_str_response(textInputPrompt)
return prompt_response

async def prompt_stream_verification_step(self) -> bool:
"""Prompt user to verify the video stream.

Raises:
ValueError: When receiving an unexpected response

Returns:
bool: False if user responds Failed
"""
prompt = self.name
if self.verification is not None:
prompt += f"\n\n{self.verification}"

options = {
"PASS": PromptOptions.PASS,
"FAIL": PromptOptions.FAIL,
}
prompt_request = StreamVerificationPromptRequest(
prompt=prompt, options=options, timeout=OUTCOME_TIMEOUT_S
)
prompt_response = await self.send_prompt_request(prompt_request)
self.__evaluate_user_response_for_errors(prompt_response)

if prompt_response.response == PromptOptions.FAIL:
self.append_failure("User stated manual step FAILED.")
return False
elif prompt_response.response == PromptOptions.PASS:
logger.info("User stated this manual step PASSED.")
return True
else:
raise ValueError(
f"Received unknown prompt option: {prompt_response.response}"
)

async def prompt_verification_step(self) -> bool:
"""Sends a prompt request to present instructions and get outcome from user.

Expand Down
Loading
Loading