Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from utilities.minio import create_minio_data_connection_secret
from utilities.operator_utils import get_csv_related_images, get_cluster_service_version
from ocp_resources.authentication_config_openshift_io import Authentication
from utilities.user_utils import get_unprivileged_context
from utilities.user_utils import get_oidc_tokens, get_byoidc_issuer_url

LOGGER = get_logger(name=__name__)

Expand Down Expand Up @@ -342,28 +342,33 @@ def use_unprivileged_client(pytestconfig: pytest.Config) -> bool:


@pytest.fixture(scope="session")
def non_admin_user_password(admin_client: DynamicClient, use_unprivileged_client: bool) -> tuple[str, str] | None:
def non_admin_user_password(
admin_client: DynamicClient, use_unprivileged_client: bool, is_byoidc: bool
) -> tuple[str, str] | None:
def _decode_split_data(_data: str) -> list[str]:
return base64.b64decode(_data).decode().split(",")

if not use_unprivileged_client:
return None

if ldap_Secret := list(
secret_name = "byoidc-credentials" if is_byoidc else "openldap" # pragma: allowlist secret
secret_ns = "oidc" if is_byoidc else "openldap" # pragma: allowlist secret

if users_Secret := list(
Secret.get(
dyn_client=admin_client,
name="openldap",
namespace="openldap",
name=secret_name,
namespace=secret_ns,
)
):
data = ldap_Secret[0].instance.data
data = users_Secret[0].instance.data
users = _decode_split_data(_data=data.users)
passwords = _decode_split_data(_data=data.passwords)
first_user_index = next(index for index, user in enumerate(users) if "user" in user)

return users[first_user_index], passwords[first_user_index]

LOGGER.error("ldap secret not found")
LOGGER.error("user credentials secret not found")
return None


Expand Down Expand Up @@ -406,24 +411,49 @@ def unprivileged_client(
LOGGER.warning("Unprivileged client is not enabled, using admin client")
yield admin_client

elif is_byoidc:
# this requires a pre-existing context in $KUBECONFIG with a unprivileged user
try:
unprivileged_context, _ = get_unprivileged_context()
except ValueError as e:
raise ValueError(
f"Failed to get unprivileged context for BYOIDC mode. "
f"Ensure the context naming follows the convention: <context>-unprivileged. "
f"Error: {e}"
) from e
elif non_admin_user_password is None:
raise ValueError("Unprivileged user not provisioned")

unprivileged_client = get_client(config_file=kubconfig_filepath, context=unprivileged_context)
elif is_byoidc:
tokens = get_oidc_tokens(admin_client, non_admin_user_password[0], non_admin_user_password[1])
issuer = get_byoidc_issuer_url(admin_client)

with open(kubconfig_filepath) as fd:
kubeconfig_content = yaml.safe_load(fd)

# create the oidc user config
user = {
"name": non_admin_user_password[0],
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "oc-cli",
"client-secret": "",
"idp-issuer-url": issuer,
"id-token": tokens[0],
"refresh-token": tokens[1],
},
}
},
}

# replace the users - we only need this one user
kubeconfig_content["users"] = [user]

# get the current context and modify the referenced user in place
current_context_name = kubeconfig_content["current-context"]
current_context = [c for c in kubeconfig_content["contexts"] if c["name"] == current_context_name][0]
current_context["context"]["user"] = non_admin_user_password[0]

unprivileged_client = get_client(
config_dict=kubeconfig_content,
context=current_context_name,
persist_config=False, # keep the kubeconfig intact
)

yield unprivileged_client

elif non_admin_user_password is None:
raise ValueError("Unprivileged user not provisioned")

else:
current_user = run_command(command=["oc", "whoami"])[1].strip()
non_admin_user_name = non_admin_user_password[0]
Expand Down
10 changes: 1 addition & 9 deletions tests/model_registry/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from model_registry.types import RegisteredModel

from utilities.general import wait_for_pods_running
from utilities.infra import get_cluster_authentication
from utilities.user_utils import get_byoidc_issuer_url

ADDRESS_ANNOTATION_PREFIX: str = "routing.opendatahub.io/external-address-"
MARIA_DB_IMAGE = (
Expand Down Expand Up @@ -835,14 +835,6 @@ def get_mr_user_token(admin_client: DynamicClient, user_credentials_rbac: dict[s
raise e


def get_byoidc_issuer_url(admin_client: DynamicClient) -> str:
authentication = get_cluster_authentication(admin_client=admin_client)
assert authentication is not None
url = authentication.instance.spec.oidcProviders[0].issuer.issuerURL
assert url is not None
return url


def get_byoidc_user_credentials(username: str = None) -> Dict[str, str]:
"""
Get user credentials from byoidc-credentials secret.
Expand Down
60 changes: 43 additions & 17 deletions utilities/user_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import tempfile
from dataclasses import dataclass

import requests
from kubernetes.dynamic import DynamicClient
from ocp_resources.user import User
from pyhelper_utils.shell import run_command
from timeout_sampler import retry

from utilities.exceptions import ExceptionUserLogin
from utilities.infra import login_with_user_password
from utilities.infra import login_with_user_password, get_cluster_authentication
import base64
from pathlib import Path

Expand Down Expand Up @@ -94,19 +96,43 @@ def wait_for_user_creation(username: str, password: str, cluster_url: str) -> bo
raise ExceptionUserLogin(f"Could not login as user {username}.")


def get_unprivileged_context() -> tuple[str, str]:
"""
Get the unprivileged context from the current context.
This only works in BYOIDC mode, and it assumes that the unprivileged context is already created
with a precise naming convention: <current_context>-unprivileged.

Returns:
Tuple of (unprivileged context, current context).
"""
status, current_context, _ = run_command(command=["oc", "config", "current-context"])
current_context = current_context.strip()
if not status or not current_context:
raise ValueError("Could not get current context from oc config current-context")
if current_context.endswith("-unprivileged"):
raise ValueError("Current context is already called [...]-unprivileged")
return current_context + "-unprivileged", current_context
def get_oidc_tokens(admin_client: DynamicClient, username: str, password: str) -> tuple[str, str]:
url = f"{get_byoidc_issuer_url(admin_client=admin_client)}/protocol/openid-connect/token"
headers = {"Content-Type": "application/x-www-form-urlencoded", "User-Agent": "python-requests"}

data = {
"username": username,
"password": password,
"grant_type": "password",
"client_id": "oc-cli",
"scope": "openid",
}

try:
LOGGER.info(f"Requesting token for user {username} in byoidc environment")
response = requests.post(
url=url,
headers=headers,
data=data,
allow_redirects=True,
timeout=30,
verify=True, # Set to False if you need to skip SSL verification
)
response.raise_for_status()
json_response = response.json()

# Validate that we got an access token
if "id_token" not in json_response or "refresh_token" not in json_response:
LOGGER.error("Warning: No id_token or refresh_token in response")
raise AssertionError(f"No id_token or refresh_token in response: {json_response}")
return json_response["id_token"], json_response["refresh_token"]
except Exception as e:
raise e


def get_byoidc_issuer_url(admin_client: DynamicClient) -> str:
authentication = get_cluster_authentication(admin_client=admin_client)
assert authentication is not None
url = authentication.instance.spec.oidcProviders[0].issuer.issuerURL
assert url is not None
return url