Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6dde381
initial implementation for api test automation
sbelhaik Jan 23, 2026
ec7ff32
return raw response object for compatibility with API testing
sbelhaik Feb 16, 2026
f89c506
align rest_client with API tests
sbelhaik Feb 17, 2026
b5a34f6
json based scene test scenarios
sbelhaik Feb 17, 2026
d64390c
add test config
sbelhaik Feb 17, 2026
a09bb0d
test runner for api scenarios
sbelhaik Feb 17, 2026
7bc61c2
add readme
sbelhaik Feb 17, 2026
cd71cb7
add license
sbelhaik Feb 17, 2026
01c90ae
verify expected json body
sbelhaik Feb 18, 2026
710fec5
remove unused files
sbelhaik Feb 18, 2026
3682beb
fix readme.md syntax
sbelhaik Feb 18, 2026
6a3073c
fix indentation
sbelhaik Feb 18, 2026
000f89c
Merge branch 'main' into sbel/automate-api-tests
sbelhaik Feb 18, 2026
bfa88ea
update scene scenario
sbelhaik Feb 18, 2026
5cc6103
rename back parameter base_url to url
sbelhaik Feb 18, 2026
3529048
remove parsing redundancy
sbelhaik Feb 18, 2026
3755a8f
correct url logic
sbelhaik Feb 18, 2026
30ddeb1
include newline between erors
sbelhaik Feb 18, 2026
43fa128
remove uid saving from failed request
sbelhaik Feb 18, 2026
d899fdd
fix typo
sbelhaik Feb 18, 2026
7db7da4
Merge branch 'main' into sbel/automate-api-tests
sbelhaik Feb 18, 2026
b016b66
Merge branch 'main' into sbel/automate-api-tests
sbelhaik Feb 19, 2026
d13d26a
fix prettier check
sbelhaik Feb 19, 2026
ff5a901
Merge branch 'main' into sbel/automate-api-tests
sbelhaik Feb 19, 2026
c856719
Merge branch 'main' into sbel/automate-api-tests
sbelhaik Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 85 additions & 20 deletions scene_common/src/scene_common/rest_client.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,53 @@
# SPDX-FileCopyrightText: (C) 2023 - 2025 Intel Corporation
# SPDX-FileCopyrightText: (C) 2023 - 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
import json
import re
import requests
import sys
from http import HTTPStatus
from urllib.parse import urljoin


class RESTResult(dict):
def __init__(self, statusCode, errors=None):
super().__init__()
self.statusCode = statusCode
self.errors = errors
return

@property
def status_code(self):
return self.statusCode

def json(self):
return dict(self)

@property
def text(self):
return json.dumps(dict(self))


class RESTClient:
def __init__(self, url, rootcert=None, auth=None):
def __init__(self, url=None, token=None, auth=None,
rootcert=None, verify_ssl=False, timeout=10):
self.url = url
self.rootcert = rootcert
if not self.url.endswith("/"):

if self.url and not self.url.endswith("/"):
self.url = self.url + "/"

# Handle SSL verification (support both bool and path)
self.verify_ssl = verify_ssl if verify_ssl is not False else False
if rootcert:
self.verify_ssl = rootcert

self.timeout = timeout
self.session = requests.session()
if auth:

# If token provided directly, use it (skip authentication)
if token:
self.token = token
elif auth:
self._parseAuth(auth)
return

Expand All @@ -46,10 +70,10 @@ def _parseAuth(self, auth):
res = self.authenticate(user, pw)
if not res:
error_message = (
f"Failed to authenticate\n"
f" URL: {self.url}\n"
f" status: {res.statusCode}\n"
f" errors: {res.errors}"
f"Failed to authenticate\n"
f" URL: {self.url}\n"
f" status: {res.statusCode}\n"
f" errors: {res.errors}"
)
raise RuntimeError(error_message)
return
Expand All @@ -58,6 +82,38 @@ def _parseAuth(self, auth):
def isAuthenticated(self):
return hasattr(self, 'token') and self.token is not None

def _headers(self):
headers = {
"Content-Type": "application/json"
}
if hasattr(self, 'token') and self.token:
headers["Authorization"] = f"Token {self.token}"
return headers

def request(self, method, path, **kwargs):
"""
Returns raw requests.Response object for compatibility with API tests
"""
# Ensure path starts with /
if not path.startswith('/'):
path = '/' + path

url = urljoin(self.url, path.lstrip('/'))

# Merge headers
headers = self._headers()
if 'headers' in kwargs:
headers.update(kwargs.pop('headers'))

return self.session.request(
method=method,
url=url,
headers=headers,
verify=self.verify_ssl,
timeout=self.timeout,
**kwargs
)

def decodeReply(self, reply, expectedStatus, successContent=None):
result = RESTResult(statusCode=reply.status_code)
decoded = False
Expand All @@ -69,10 +125,11 @@ def decodeReply(self, reply, expectedStatus, successContent=None):
content = reply.content
else:
content = {
'data': reply.content,
'data': reply.content,
}
if 'Content-Disposition' in reply.headers:
fname = re.findall("filename=(.+)", reply.headers['Content-Disposition'])[0]
fname = re.findall("filename=(.+)",
reply.headers['Content-Disposition'])[0]
content['filename'] = fname
decoded = True

Expand All @@ -99,11 +156,15 @@ def authenticate(self, user, password):
auth_url = urljoin(self.url, "auth")
try:
reply = self.session.post(auth_url, data={'username': user, 'password': password},
verify=self.rootcert)
verify=self.verify_ssl)
except requests.exceptions.ConnectionError as err:
result = RESTResult("ConnectionError", errors=("Connection error", str(err)))
result = RESTResult(
"ConnectionError", errors=(
"Connection error", str(err)))
else:
result = self.decodeReply(reply, HTTPStatus.OK, successContent={'authenticated': True})
result = self.decodeReply(
reply, HTTPStatus.OK, successContent={
'authenticated': True})
if reply.status_code == HTTPStatus.OK:
data = json.loads(reply.content)
self.token = data['token']
Expand All @@ -120,7 +181,8 @@ def prepareDataArgs(self, data, files):
if not files:
data_args = {'json': data}
elif self.dataIsNested(data):
raise ValueError("requests library can't combine files and nested dictionaries")
raise ValueError(
"requests library can't combine files and nested dictionaries")
return data_args

def _create(self, endpoint, data, files=None):
Expand All @@ -137,7 +199,7 @@ def _create(self, endpoint, data, files=None):
headers = {'Authorization': f"Token {self.token}"}
data_args = self.prepareDataArgs(data, files)
reply = self.session.post(full_path, **data_args, files=files,
headers=headers, verify=self.rootcert)
headers=headers, verify=self.verify_ssl)
return self.decodeReply(reply, HTTPStatus.CREATED)

def _get(self, endpoint, parameters):
Expand All @@ -152,7 +214,7 @@ def _get(self, endpoint, parameters):
full_path = urljoin(self.url, endpoint)
headers = {'Authorization': f"Token {self.token}"}
reply = self.session.get(full_path, params=parameters, headers=headers,
verify=self.rootcert)
verify=self.verify_ssl)
return self.decodeReply(reply, HTTPStatus.OK)

def _update(self, endpoint, data, files=None):
Expand All @@ -169,7 +231,7 @@ def _update(self, endpoint, data, files=None):
headers = {'Authorization': f"Token {self.token}"}
data_args = self.prepareDataArgs(data, files)
reply = self.session.post(full_path, **data_args, files=files,
headers=headers, verify=self.rootcert)
headers=headers, verify=self.verify_ssl)
return self.decodeReply(reply, HTTPStatus.OK)

def _delete(self, endpoint):
Expand All @@ -181,7 +243,10 @@ def _delete(self, endpoint):
"""
full_path = urljoin(self.url, endpoint)
headers = {'Authorization': f"Token {self.token}"}
reply = self.session.delete(full_path, headers=headers, verify=self.rootcert)
reply = self.session.delete(
full_path,
headers=headers,
verify=self.verify_ssl)
return self.decodeReply(reply, HTTPStatus.OK)

def _separateFiles(self, data, fields):
Expand Down
Loading
Loading