|
62 | 62 | from utilities.minio import create_minio_data_connection_secret |
63 | 63 | from utilities.operator_utils import get_csv_related_images, get_cluster_service_version |
64 | 64 | from ocp_resources.authentication_config_openshift_io import Authentication |
65 | | -from utilities.user_utils import get_unprivileged_context |
| 65 | +from utilities.user_utils import get_oidc_tokens, get_byoidc_issuer_url |
66 | 66 |
|
67 | 67 | LOGGER = get_logger(name=__name__) |
68 | 68 |
|
@@ -342,28 +342,33 @@ def use_unprivileged_client(pytestconfig: pytest.Config) -> bool: |
342 | 342 |
|
343 | 343 |
|
344 | 344 | @pytest.fixture(scope="session") |
345 | | -def non_admin_user_password(admin_client: DynamicClient, use_unprivileged_client: bool) -> tuple[str, str] | None: |
| 345 | +def non_admin_user_password( |
| 346 | + admin_client: DynamicClient, use_unprivileged_client: bool, is_byoidc: bool |
| 347 | +) -> tuple[str, str] | None: |
346 | 348 | def _decode_split_data(_data: str) -> list[str]: |
347 | 349 | return base64.b64decode(_data).decode().split(",") |
348 | 350 |
|
349 | 351 | if not use_unprivileged_client: |
350 | 352 | return None |
351 | 353 |
|
352 | | - if ldap_Secret := list( |
| 354 | + secret_name = "byoidc-credentials" if is_byoidc else "openldap" # pragma: allowlist secret |
| 355 | + secret_ns = "oidc" if is_byoidc else "openldap" # pragma: allowlist secret |
| 356 | + |
| 357 | + if users_Secret := list( |
353 | 358 | Secret.get( |
354 | 359 | dyn_client=admin_client, |
355 | | - name="openldap", |
356 | | - namespace="openldap", |
| 360 | + name=secret_name, |
| 361 | + namespace=secret_ns, |
357 | 362 | ) |
358 | 363 | ): |
359 | | - data = ldap_Secret[0].instance.data |
| 364 | + data = users_Secret[0].instance.data |
360 | 365 | users = _decode_split_data(_data=data.users) |
361 | 366 | passwords = _decode_split_data(_data=data.passwords) |
362 | 367 | first_user_index = next(index for index, user in enumerate(users) if "user" in user) |
363 | 368 |
|
364 | 369 | return users[first_user_index], passwords[first_user_index] |
365 | 370 |
|
366 | | - LOGGER.error("ldap secret not found") |
| 371 | + LOGGER.error("user credentials secret not found") |
367 | 372 | return None |
368 | 373 |
|
369 | 374 |
|
@@ -406,24 +411,49 @@ def unprivileged_client( |
406 | 411 | LOGGER.warning("Unprivileged client is not enabled, using admin client") |
407 | 412 | yield admin_client |
408 | 413 |
|
409 | | - elif is_byoidc: |
410 | | - # this requires a pre-existing context in $KUBECONFIG with a unprivileged user |
411 | | - try: |
412 | | - unprivileged_context, _ = get_unprivileged_context() |
413 | | - except ValueError as e: |
414 | | - raise ValueError( |
415 | | - f"Failed to get unprivileged context for BYOIDC mode. " |
416 | | - f"Ensure the context naming follows the convention: <context>-unprivileged. " |
417 | | - f"Error: {e}" |
418 | | - ) from e |
| 414 | + elif non_admin_user_password is None: |
| 415 | + raise ValueError("Unprivileged user not provisioned") |
419 | 416 |
|
420 | | - unprivileged_client = get_client(config_file=kubconfig_filepath, context=unprivileged_context) |
| 417 | + elif is_byoidc: |
| 418 | + tokens = get_oidc_tokens(admin_client, non_admin_user_password[0], non_admin_user_password[1]) |
| 419 | + issuer = get_byoidc_issuer_url(admin_client) |
| 420 | + |
| 421 | + with open(kubconfig_filepath) as fd: |
| 422 | + kubeconfig_content = yaml.safe_load(fd) |
| 423 | + |
| 424 | + # create the oidc user config |
| 425 | + user = { |
| 426 | + "name": non_admin_user_password[0], |
| 427 | + "user": { |
| 428 | + "auth-provider": { |
| 429 | + "name": "oidc", |
| 430 | + "config": { |
| 431 | + "client-id": "oc-cli", |
| 432 | + "client-secret": "", |
| 433 | + "idp-issuer-url": issuer, |
| 434 | + "id-token": tokens[0], |
| 435 | + "refresh-token": tokens[1], |
| 436 | + }, |
| 437 | + } |
| 438 | + }, |
| 439 | + } |
| 440 | + |
| 441 | + # replace the users - we only need this one user |
| 442 | + kubeconfig_content["users"] = [user] |
| 443 | + |
| 444 | + # get the current context and modify the referenced user in place |
| 445 | + current_context_name = kubeconfig_content["current-context"] |
| 446 | + current_context = [c for c in kubeconfig_content["contexts"] if c["name"] == current_context_name][0] |
| 447 | + current_context["context"]["user"] = non_admin_user_password[0] |
| 448 | + |
| 449 | + unprivileged_client = get_client( |
| 450 | + config_dict=kubeconfig_content, |
| 451 | + context=current_context_name, |
| 452 | + persist_config=False, # keep the kubeconfig intact |
| 453 | + ) |
421 | 454 |
|
422 | 455 | yield unprivileged_client |
423 | 456 |
|
424 | | - elif non_admin_user_password is None: |
425 | | - raise ValueError("Unprivileged user not provisioned") |
426 | | - |
427 | 457 | else: |
428 | 458 | current_user = run_command(command=["oc", "whoami"])[1].strip() |
429 | 459 | non_admin_user_name = non_admin_user_password[0] |
|
0 commit comments