forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuser_utils.py
More file actions
138 lines (112 loc) · 4.53 KB
/
user_utils.py
File metadata and controls
138 lines (112 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import logging
import shlex
import tempfile
from dataclasses import dataclass
import requests
from kubernetes.dynamic import DynamicClient
from ocp_resources.user import User
from pyhelper_utils.shell import run_command
from timeout_sampler import retry
from utilities.exceptions import ExceptionUserLogin
from utilities.infra import login_with_user_password, get_cluster_authentication
import base64
from pathlib import Path
LOGGER = logging.getLogger(__name__)
SLEEP_TIME = 5
@dataclass
class UserTestSession:
"""Represents a test user session with all necessary credentials and contexts."""
__test__ = False
idp_name: str
secret_name: str
username: str
password: str
original_user: str
api_server_url: str
is_byoidc: bool = False
def __post_init__(self) -> None:
"""Validate the session data after initialization."""
if not all([self.idp_name, self.secret_name, self.username, self.password]):
raise ValueError("All session fields must be non-empty")
if not (self.api_server_url and self.original_user):
raise ValueError("Original user information and api url must be set")
def cleanup(self) -> None:
"""Clean up the user context."""
user = User(name=self.username)
if user.exists:
user.delete()
def create_htpasswd_file(username: str, password: str) -> tuple[Path, str]:
"""
Create an htpasswd file for a user.
Args:
username: The username to add to the htpasswd file
password: The password for the user
Returns:
Tuple of (temp file path, base64 encoded content)
"""
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
temp_path = Path(temp_file.name).resolve() # Get absolute path
run_command(
command=shlex.split(f"htpasswd -c -b {str(temp_path.absolute())} {username} {password}"), check=True
)
# Read the htpasswd file content and encode it
temp_file.seek(0) # noqa: FCN001 - TextIOWrapper.seek() doesn't accept keyword arguments
htpasswd_content = temp_file.read()
htpasswd_b64 = base64.b64encode(htpasswd_content.encode()).decode()
return temp_path, htpasswd_b64
@retry(
wait_timeout=240,
sleep=10,
exceptions_dict={ExceptionUserLogin: []},
)
def wait_for_user_creation(username: str, password: str, cluster_url: str) -> bool:
"""
Attempts to login to OpenShift as a specific user over a period of time to ensure user creation
Args:
username: The username to login with
password: The password to login with
cluster_url: The OpenShift cluster URL
Returns:
True if login is successful
"""
# not executed in byoidc mode
LOGGER.info(f"Attempting to login as {username}")
res = login_with_user_password(api_address=cluster_url, user=username, password=password)
if res:
return True
raise ExceptionUserLogin(f"Could not login as user {username}.")
def get_oidc_tokens(admin_client: DynamicClient, username: str, password: str) -> tuple[str, str]:
url = f"{get_byoidc_issuer_url(admin_client=admin_client)}/protocol/openid-connect/token"
headers = {"Content-Type": "application/x-www-form-urlencoded", "User-Agent": "python-requests"}
data = {
"username": username,
"password": password,
"grant_type": "password",
"client_id": "oc-cli",
"scope": "openid",
}
try:
LOGGER.info(f"Requesting token for user {username} in byoidc environment")
response = requests.post(
url=url,
headers=headers,
data=data,
allow_redirects=True,
timeout=30,
verify=True, # Set to False if you need to skip SSL verification
)
response.raise_for_status()
json_response = response.json()
# Validate that we got an access token
if "id_token" not in json_response or "refresh_token" not in json_response:
LOGGER.error("Warning: No id_token or refresh_token in response")
raise AssertionError(f"No id_token or refresh_token in response: {json_response}")
return json_response["id_token"], json_response["refresh_token"]
except Exception as e:
raise e
def get_byoidc_issuer_url(admin_client: DynamicClient) -> str:
authentication = get_cluster_authentication(admin_client=admin_client)
assert authentication is not None
url = authentication.instance.spec.oidcProviders[0].issuer.issuerURL
assert url is not None
return url