From a0b8a1536806740d3b86ceef1c5d33f74fb501b5 Mon Sep 17 00:00:00 2001 From: Andrej Smigala Date: Fri, 28 Nov 2025 10:09:06 +0100 Subject: [PATCH] BYOIDC: unprivileged user login using secrets from cluster --- tests/conftest.py | 72 +++++++++++++++++++++++++---------- tests/model_registry/utils.py | 10 +---- utilities/user_utils.py | 60 ++++++++++++++++++++--------- 3 files changed, 95 insertions(+), 47 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 20b5512b9..8546654a9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,7 +62,7 @@ from utilities.minio import create_minio_data_connection_secret from utilities.operator_utils import get_csv_related_images, get_cluster_service_version from ocp_resources.authentication_config_openshift_io import Authentication -from utilities.user_utils import get_unprivileged_context +from utilities.user_utils import get_oidc_tokens, get_byoidc_issuer_url LOGGER = get_logger(name=__name__) @@ -342,28 +342,33 @@ def use_unprivileged_client(pytestconfig: pytest.Config) -> bool: @pytest.fixture(scope="session") -def non_admin_user_password(admin_client: DynamicClient, use_unprivileged_client: bool) -> tuple[str, str] | None: +def non_admin_user_password( + admin_client: DynamicClient, use_unprivileged_client: bool, is_byoidc: bool +) -> tuple[str, str] | None: def _decode_split_data(_data: str) -> list[str]: return base64.b64decode(_data).decode().split(",") if not use_unprivileged_client: return None - if ldap_Secret := list( + secret_name = "byoidc-credentials" if is_byoidc else "openldap" # pragma: allowlist secret + secret_ns = "oidc" if is_byoidc else "openldap" # pragma: allowlist secret + + if users_Secret := list( Secret.get( dyn_client=admin_client, - name="openldap", - namespace="openldap", + name=secret_name, + namespace=secret_ns, ) ): - data = ldap_Secret[0].instance.data + data = users_Secret[0].instance.data users = _decode_split_data(_data=data.users) passwords = _decode_split_data(_data=data.passwords) first_user_index = next(index for index, user in enumerate(users) if "user" in user) return users[first_user_index], passwords[first_user_index] - LOGGER.error("ldap secret not found") + LOGGER.error("user credentials secret not found") return None @@ -406,24 +411,49 @@ def unprivileged_client( LOGGER.warning("Unprivileged client is not enabled, using admin client") yield admin_client - elif is_byoidc: - # this requires a pre-existing context in $KUBECONFIG with a unprivileged user - try: - unprivileged_context, _ = get_unprivileged_context() - except ValueError as e: - raise ValueError( - f"Failed to get unprivileged context for BYOIDC mode. " - f"Ensure the context naming follows the convention: -unprivileged. " - f"Error: {e}" - ) from e + elif non_admin_user_password is None: + raise ValueError("Unprivileged user not provisioned") - unprivileged_client = get_client(config_file=kubconfig_filepath, context=unprivileged_context) + elif is_byoidc: + tokens = get_oidc_tokens(admin_client, non_admin_user_password[0], non_admin_user_password[1]) + issuer = get_byoidc_issuer_url(admin_client) + + with open(kubconfig_filepath) as fd: + kubeconfig_content = yaml.safe_load(fd) + + # create the oidc user config + user = { + "name": non_admin_user_password[0], + "user": { + "auth-provider": { + "name": "oidc", + "config": { + "client-id": "oc-cli", + "client-secret": "", + "idp-issuer-url": issuer, + "id-token": tokens[0], + "refresh-token": tokens[1], + }, + } + }, + } + + # replace the users - we only need this one user + kubeconfig_content["users"] = [user] + + # get the current context and modify the referenced user in place + current_context_name = kubeconfig_content["current-context"] + current_context = [c for c in kubeconfig_content["contexts"] if c["name"] == current_context_name][0] + current_context["context"]["user"] = non_admin_user_password[0] + + unprivileged_client = get_client( + config_dict=kubeconfig_content, + context=current_context_name, + persist_config=False, # keep the kubeconfig intact + ) yield unprivileged_client - elif non_admin_user_password is None: - raise ValueError("Unprivileged user not provisioned") - else: current_user = run_command(command=["oc", "whoami"])[1].strip() non_admin_user_name = non_admin_user_password[0] diff --git a/tests/model_registry/utils.py b/tests/model_registry/utils.py index 5f26055a0..e15ecd6f5 100644 --- a/tests/model_registry/utils.py +++ b/tests/model_registry/utils.py @@ -34,7 +34,7 @@ from model_registry.types import RegisteredModel from utilities.general import wait_for_pods_running -from utilities.infra import get_cluster_authentication +from utilities.user_utils import get_byoidc_issuer_url ADDRESS_ANNOTATION_PREFIX: str = "routing.opendatahub.io/external-address-" MARIA_DB_IMAGE = ( @@ -835,14 +835,6 @@ def get_mr_user_token(admin_client: DynamicClient, user_credentials_rbac: dict[s raise e -def get_byoidc_issuer_url(admin_client: DynamicClient) -> str: - authentication = get_cluster_authentication(admin_client=admin_client) - assert authentication is not None - url = authentication.instance.spec.oidcProviders[0].issuer.issuerURL - assert url is not None - return url - - def get_byoidc_user_credentials(username: str = None) -> Dict[str, str]: """ Get user credentials from byoidc-credentials secret. diff --git a/utilities/user_utils.py b/utilities/user_utils.py index 71770b160..a70f08d3c 100644 --- a/utilities/user_utils.py +++ b/utilities/user_utils.py @@ -3,12 +3,14 @@ import tempfile from dataclasses import dataclass +import requests +from kubernetes.dynamic import DynamicClient from ocp_resources.user import User from pyhelper_utils.shell import run_command from timeout_sampler import retry from utilities.exceptions import ExceptionUserLogin -from utilities.infra import login_with_user_password +from utilities.infra import login_with_user_password, get_cluster_authentication import base64 from pathlib import Path @@ -94,19 +96,43 @@ def wait_for_user_creation(username: str, password: str, cluster_url: str) -> bo raise ExceptionUserLogin(f"Could not login as user {username}.") -def get_unprivileged_context() -> tuple[str, str]: - """ - Get the unprivileged context from the current context. - This only works in BYOIDC mode, and it assumes that the unprivileged context is already created - with a precise naming convention: -unprivileged. - - Returns: - Tuple of (unprivileged context, current context). - """ - status, current_context, _ = run_command(command=["oc", "config", "current-context"]) - current_context = current_context.strip() - if not status or not current_context: - raise ValueError("Could not get current context from oc config current-context") - if current_context.endswith("-unprivileged"): - raise ValueError("Current context is already called [...]-unprivileged") - return current_context + "-unprivileged", current_context +def get_oidc_tokens(admin_client: DynamicClient, username: str, password: str) -> tuple[str, str]: + url = f"{get_byoidc_issuer_url(admin_client=admin_client)}/protocol/openid-connect/token" + headers = {"Content-Type": "application/x-www-form-urlencoded", "User-Agent": "python-requests"} + + data = { + "username": username, + "password": password, + "grant_type": "password", + "client_id": "oc-cli", + "scope": "openid", + } + + try: + LOGGER.info(f"Requesting token for user {username} in byoidc environment") + response = requests.post( + url=url, + headers=headers, + data=data, + allow_redirects=True, + timeout=30, + verify=True, # Set to False if you need to skip SSL verification + ) + response.raise_for_status() + json_response = response.json() + + # Validate that we got an access token + if "id_token" not in json_response or "refresh_token" not in json_response: + LOGGER.error("Warning: No id_token or refresh_token in response") + raise AssertionError(f"No id_token or refresh_token in response: {json_response}") + return json_response["id_token"], json_response["refresh_token"] + except Exception as e: + raise e + + +def get_byoidc_issuer_url(admin_client: DynamicClient) -> str: + authentication = get_cluster_authentication(admin_client=admin_client) + assert authentication is not None + url = authentication.instance.spec.oidcProviders[0].issuer.issuerURL + assert url is not None + return url