Skip to content

[Camera] Adds support for camera app yaml test cases through camera-controller #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: v2.13-develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions app/api/api_v1/sockets/web_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
# limitations under the License.
#
import asyncio
import socket

from fastapi import APIRouter, WebSocket
from fastapi.websockets import WebSocketDisconnect
from loguru import logger

from app.constants.websockets_constants import UDP_SOCKET_INTERFACE, UDP_SOCKET_PORT
from app.socket_connection_manager import SocketConnectionManager

router = APIRouter()
Expand All @@ -41,3 +44,29 @@ async def websocket_endpoint(websocket: WebSocket) -> None:

except WebSocketDisconnect:
socket_connection_manager.disconnect(websocket)


@router.websocket("/ws/video")
async def websocket_video_endpoint(websocket: WebSocket) -> None:
try:
await websocket.accept()
logger.info(f'Websocket connected: "{websocket}".')
except RuntimeError as e:
logger.info(f'Failed to connect with error: "{e}".')
raise e
Comment on lines +51 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to replace for something similar to the websocket_endpoint()?
Like:

Suggested change
try:
await websocket.accept()
logger.info(f'Websocket connected: "{websocket}".')
except RuntimeError as e:
logger.info(f'Failed to connect with error: "{e}".')
raise e
socket_connection_manager = SocketConnectionManager()
await socket_connection_manager.connect(websocket)


try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
sock.bind((UDP_SOCKET_INTERFACE, UDP_SOCKET_PORT))
logger.info("UDP socket bound successfully")
loop = asyncio.get_event_loop()
while True:
data, _ = await loop.run_in_executor(None, sock.recvfrom, 65536)
# send data to ws
await websocket.send_bytes(data)
Comment on lines +59 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use SocketConnectionManager() to handle those operations with a new method?

except WebSocketDisconnect:
logger.info(f'Websocket for video stream disconnected: "{websocket}".')
except Exception as e:
logger.info(f"Failed with {e}")
finally:
await websocket.close()
4 changes: 4 additions & 0 deletions app/constants/websockets_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
MISSING_TYPE_ERROR_STR = "The message is missing a type key"
NO_HANDLER_FOR_MSG_ERROR_STR = "There is no handler registered for this message type"

UDP_SOCKET_PORT = 5000
UDP_SOCKET_INTERFACE = "0.0.0.0"


# Enum Keys for different types of messages currently supported by the tool
class MessageTypeEnum(str, Enum):
Expand All @@ -33,6 +36,7 @@ class MessageTypeEnum(str, Enum):
TIME_OUT_NOTIFICATION = "time_out_notification"
TEST_LOG_RECORDS = "test_log_records"
INVALID_MESSAGE = "invalid_message"
STREAM_VERIFICATION_REQUEST = "stream_verification_request"


# Enum keys used with messages at the top level
Expand Down
47 changes: 47 additions & 0 deletions app/test_engine/models/manual_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from app.user_prompt_support import (
OptionsSelectPromptRequest,
PromptResponse,
StreamVerificationPromptRequest,
TextInputPromptRequest,
UploadFile,
UploadFilePromptRequest,
UserPromptSupport,
Expand Down Expand Up @@ -52,6 +54,51 @@ def __init__(self, name: str, verification: Optional[str] = None) -> None:
super().__init__(name=name)
self.verification = verification

async def prompt_verification_step_with_response(
self, textInputPrompt: TextInputPromptRequest
) -> str:
"""Prompt user to verify the video stream.

Returns:
str: string response received as user input
"""
prompt_response = await self.invoke_prompt_and_get_str_response(textInputPrompt)
return prompt_response

async def prompt_stream_verification_step(self) -> bool:
"""Prompt user to verify the video stream.

Raises:
ValueError: When receiving an unexpected response

Returns:
bool: False if user responds Failed
"""
prompt = self.name
if self.verification is not None:
prompt += f"\n\n{self.verification}"

options = {
"PASS": PromptOptions.PASS,
"FAIL": PromptOptions.FAIL,
}
prompt_request = StreamVerificationPromptRequest(
prompt=prompt, options=options, timeout=OUTCOME_TIMEOUT_S
)
prompt_response = await self.send_prompt_request(prompt_request)
self.__evaluate_user_response_for_errors(prompt_response)

if prompt_response.response == PromptOptions.FAIL:
self.append_failure("User stated manual step FAILED.")
return False
elif prompt_response.response == PromptOptions.PASS:
logger.info("User stated this manual step PASSED.")
return True
else:
raise ValueError(
f"Received unknown prompt option: {prompt_response.response}"
)

async def prompt_verification_step(self) -> bool:
"""Sends a prompt request to present instructions and get outcome from user.

Expand Down
1 change: 1 addition & 0 deletions app/user_prompt_support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .prompt_request import (
OptionsSelectPromptRequest,
PromptRequest,
StreamVerificationPromptRequest,
TextInputPromptRequest,
UploadFilePromptRequest,
)
Expand Down
6 changes: 6 additions & 0 deletions app/user_prompt_support/prompt_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ class MessagePromptRequest(PromptRequest):
@property
def messageType(self) -> MessageTypeEnum:
return MessageTypeEnum.MESSAGE_REQUEST


class StreamVerificationPromptRequest(OptionsSelectPromptRequest):
@property
def messageType(self) -> MessageTypeEnum:
return MessageTypeEnum.STREAM_VERIFICATION_REQUEST
2 changes: 2 additions & 0 deletions test_collections/matter/sdk_tests/support/chip/chip_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
CHIP_TOOL_EXE = "./chip-tool"
CHIP_TOOL_ARG_PAA_CERTS_PATH = "--paa-trust-store-path"

# TODO: Use chip-camera-controller for camera tests.

# Chip App Parameters
CHIP_APP_EXE = "./chip-app1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
ManualLogUploadStep,
ManualVerificationTestStep,
)
from app.user_prompt_support.prompt_request import MessagePromptRequest
from app.user_prompt_support.prompt_request import (
MessagePromptRequest,
TextInputPromptRequest,
)
from app.user_prompt_support.uploaded_file_support import UploadFile
from app.user_prompt_support.user_prompt_manager import user_prompt_manager
from app.user_prompt_support.user_prompt_support import UserPromptSupport
Expand All @@ -41,6 +44,7 @@

CHIP_TOOL_DEFAULT_PROMPT_TIMEOUT_S = 60 # seconds
OUTCOME_TIMEOUT_S = 60 * 10 # Seconds
EXTENDED_PROMPT_TIMEOUT_S = 300


class ChipPromptTypeEnum(str, Enum):
Expand Down Expand Up @@ -170,35 +174,77 @@ def step_failure(
def step_unknown(self) -> None:
self.__runned += 1

async def step_manual(self) -> None:
async def step_manual(self, request: TestStep) -> None:
step = self.current_test_step
if not isinstance(step, ManualVerificationTestStep):
raise TestError(f"Unexpected user prompt found in test step: {step.name}")

try:
await asyncio.wait_for(
self.__prompt_user_manual_step(step), OUTCOME_TIMEOUT_S
)
if request and request.command == "VerifyVideoStream":
await asyncio.wait_for(
self.__prompt_stream_verification_manual_step(step),
OUTCOME_TIMEOUT_S,
)
else:
await asyncio.wait_for(
self.__prompt_user_manual_step(step), OUTCOME_TIMEOUT_S
)
except asyncio.TimeoutError:
self.current_test_step.append_failure("Prompt timed out.")
self.next_step()

def show_prompt(
async def show_prompt(
self,
msg: str,
placeholder: Optional[str] = None,
default_value: Optional[str] = None,
) -> None:
pass
) -> str:
step = self.current_test_step
if not isinstance(step, ManualVerificationTestStep):
raise TestError(f"Unexpected user prompt found in test step: {step.name}")

if placeholder is None:
placeholder = "Enter value.."

userPrompt = TextInputPromptRequest(
prompt=msg,
timeout=EXTENDED_PROMPT_TIMEOUT_S,
default_value=default_value,
placeholder_text=placeholder,
regex_pattern=None,
)
response = await self.__prompt_user_manual_step_with_response(step, userPrompt)
if response is None:
raise ValueError("Failed to receive user input")
return response

# Other methods
async def __prompt_user_manual_step_with_response(
self, step: ManualVerificationTestStep, prompt: TextInputPromptRequest
) -> str:
result = await step.prompt_verification_step_with_response(prompt)

if not result:
self.current_test_step.append_failure("Manual Test Step Failure.")
result = "Failed getting response"
return result

async def __prompt_user_manual_step(self, step: ManualVerificationTestStep) -> None:
result = await step.prompt_verification_step()

if not result:
self.current_test_step.append_failure("Manual Test Step Failure.")

async def __prompt_stream_verification_manual_step(
self, step: ManualVerificationTestStep
) -> None:
result = await step.prompt_stream_verification_step()

if not result:
self.current_test_step.append_failure(
"Video Verification Test Step Failure."
)

def __report_failures(self, logger: Any, request: TestStep, received: Any) -> None:
"""
The logger from runner contains all logs entries for the test step, this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def _append_automated_test_step(self, yaml_step: MatterTestStep) -> None:
Disabled steps are ignored.
(Such tests will be marked as 'Steps Disabled' elsewhere)

UserPrompt are special cases that will prompt test operator for input.
UserPrompt, PromptWithResponse or VerifyVideoStream are special cases that will
prompt test operator for input.
"""
if yaml_step.disabled:
test_engine_logger.info(
Expand All @@ -174,7 +175,11 @@ def _append_automated_test_step(self, yaml_step: MatterTestStep) -> None:
return

step = TestStep(yaml_step.label)
if yaml_step.command == "UserPrompt":
if yaml_step.command in [
"UserPrompt",
"PromptWithResponse",
"VerifyVideoStream",
]:
Comment on lines +178 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think someone mentioned before, but anyway I think that would be good to store this array in a variable to use whenever necessary.

step = ManualVerificationTestStep(
name=yaml_step.label,
verification=yaml_step.verification,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ def _test_type(test: YamlTest) -> MatterTestType:
if all(s.disabled is True for s in steps):
return MatterTestType.MANUAL

# if any step has a UserPrompt, categorize as semi-automated
if any(s.command == "UserPrompt" for s in steps):
# if any step has a UserPrompt, PromptWithResponse or VerifyVideoStream command,
# categorize as semi-automated
if any(
s.command in ["UserPrompt", "PromptWithResponse", "VerifyVideoStream"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before:
I think someone mentioned before, but anyway I think that would be good to store this array in a variable to use whenever necessary.

for s in steps
):
return MatterTestType.SEMI_AUTOMATED

# Otherwise Automated
Expand Down