diff --git a/invenio_oauthclient/contrib/cilogon/__init__.py b/invenio_oauthclient/contrib/cilogon/__init__.py new file mode 100644 index 00000000..a7104cf0 --- /dev/null +++ b/invenio_oauthclient/contrib/cilogon/__init__.py @@ -0,0 +1,105 @@ +""" Toolkit for creating remote apps that enable sign in/up with cilogon. This was originally adapted from the keycloak plugin by +Robert Hancock of BNL. Anil Panta of JLAB helped clean it up and added some code to convert CILogon groups to Invenio roles. + +1. Register you invenio instance to cilogon via comanage registry and make sure it is configured appropriately, + like in your comanage registry, set the callabck URI as + "https://myinveniohost/oauth/authorized/cilogon/". + Make sure to grab the *Client ID* and *Client Secret* . + Minimum scope/claim should be "openid", "email", "org.cilogon.userinfo", "profile". + If you want allow certain group from cilogon to login you need to enable clain "isMemberOf". + + +2. Add the following items to your configuration (``invenio.cfg``). + The ``CilogonSettingsHelper`` class can be used to help with setting up + the configuration values: + +.. code-block:: python + + from invenio_oauthclient.contrib import cilogon + + helper = cilogon.CilogonSettingsHelper( + title="CILOGON", + description="CILOGON Comanage Registry", + base_url="https://cilogon.org", + precedence_mask={"email":True, "profile": {"username": False, "full_name": False, "affiliations": False}} + ) + + # precendence mask is added and email is set to true so that user's email is taken from cilogon not from user input. + + # create the configuration for cilogon + # because the URLs usually follow a certain schema, the settings helper + # can be used to more easily build the configuration values: + OAUTHCLIENT_CILOGON_USER_INFO_URL = helper.user_info_url + OAUTHCLIENT_CILOGON_JWKS_URL = helper.jwks_url + OAUTHCLIENT_CILOGON_CONFIG_URL = helper.base_url+'/.well-known/openid-configuration' + + # CILOGON tokens, contains information about the target audience (AUD) + # verification of the expected AUD value can be configured with: + OAUTHCLIENT_CILOGON_VERIFY_AUD = True + OAUTHCLIENT_CILOGON_AUD = "client audience"(same as client ID usually) + + # enable/disable checking if the JWT signature has expired + OAUTHCLIENT_CILOGON_VERIFY_EXP = True + + # Cilogon role values (i.e. groups) that are allowed to be used + OAUTHCLIENT_CILOGON_ALLOWED_ROLES = '["CO:COU:eic:members:all"]' + # error direct when user role/grup from cilogon doesn't match to allowed. + OAUTHCLIENT_CILOGON_ROLES_ERROR_URL = "/" + + # if you want to allow users from any group without check of allowed roles + # set the following to True (default is False) + OAUTHCLIENT_CILOGON_ALLOW_ANY_ROLES=False + + # oidc claim name for LDAP Atrribute "isMemberOf". Default "isMemberOf") + OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM = "isMemberOf" + + # add CILOGON as external login providers to the dictionary of remote apps + OAUTHCLIENT_REMOTE_APPS = dict( + cilogon=helper.remote_app, + ) + OAUTHCLIENT_REST_REMOTE_APPS = dict( + cilogon=helper.remote_rest_app, + ) + + # set the following configuration to True to automatically use the + # user's email address as account email + USERPROFILES_EXTEND_SECURITY_FORMS = True + + By default, the title will be displayed as label for the login button, + for example ``CILOGON``. The description will be + displayed in the user account section. + +3. Grab the *Client ID* and *Client Secret* from the + Comanage Registry and add them to your instance configuration (``invenio.cfg``): + + .. code-block:: python + + CILOGON_APP_CREDENTIALS = dict( + consumer_key='', + consumer_secret='', + ) + +4. Now go to ``CFG_SITE_SECURE_URL/oauth/login/cilogon/`` (e.g. + https://localhost:5000/oauth/login/cilogon/) and log in. + +5. After authenticating successfully, you should see cilogon listed under + Linked accounts: https://localhost:5000/account/settings/linkedaccounts/ +""" + + + +from .handlers import ( + disconnect_handler, + disconnect_rest_handler, + info_handler, + setup_handler, +) +from .settings import CilogonSettingsHelper + +__all__ = ( + "disconnect_handler", + "disconnect_rest_handler", + "info_handler", + "setup_handler", + "CilogonSettingsHelper", +) diff --git a/invenio_oauthclient/contrib/cilogon/handlers.py b/invenio_oauthclient/contrib/cilogon/handlers.py new file mode 100644 index 00000000..69a25ecf --- /dev/null +++ b/invenio_oauthclient/contrib/cilogon/handlers.py @@ -0,0 +1,234 @@ +from flask import session, g, current_app, redirect, url_for +from flask_login import current_user +from invenio_db import db +from invenio_i18n import gettext as _ + + +from flask_principal import ( + AnonymousIdentity, + RoleNeed, + UserNeed, +) + +from invenio_oauthclient import current_oauthclient +from invenio_oauthclient.handlers.rest import response_handler +from invenio_oauthclient.handlers.utils import require_more_than_one_external_account +from invenio_oauthclient.models import RemoteAccount +from invenio_oauthclient.oauth import oauth_link_external_id, oauth_unlink_external_id +from invenio_oauthclient.errors import OAuthCilogonRejectedAccountError + +from .helpers import get_user_info, get_groups, filter_groups + +OAUTHCLIENT_CILOGON_SESSION_KEY = "identity.cilogon_provides" +OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM = "isMemberOf" + + +def extend_identity(identity, roles): + """Extend identity with roles based on CILOGON groups.""" + if not roles: + provides = set([UserNeed(current_user.email)]) + else: + provides = set([UserNeed(current_user.email)] + [RoleNeed(name) for name in roles]) + identity.provides |= provides + key = current_app.config.get( + "OAUTHCLIENT_CILOGON_SESSION_KEY", + OAUTHCLIENT_CILOGON_SESSION_KEY, + ) + session[key] = provides + +def disconnect_identity(identity): + """Disconnect identity from CILOGON groups.""" + session.pop("cern_resource", None) + key = current_app.config.get( + "OAUTHCLIENT_CILOGON_SESSION_KEY", + OAUTHCLIENT_CILOGON_SESSION_KEY, + ) + provides = session.pop(key, set()) + identity.provides -= provides + +def info_serializer_handler(remote, resp, token_user_info, user_info=None, **kwargs): + """Serialize the account info response object. + + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :param token_user_info: The content of the authorization token response. + :param user_info: The response of the `user info` endpoint. + :returns: A dictionary with serialized user information. + """ + # fill out the information required by + # 'invenio-accounts' and 'invenio-userprofiles'. + + user_info = user_info or {} # prevent errors when accessing None.get(...) + + email = token_user_info.get("email") or user_info.get("email") + full_name = token_user_info.get("name") or user_info.get("name") + username = token_user_info.get("preferred_username") or user_info.get( + "preferred_username" + ) + cilogonid = token_user_info.get("sub") or user_info.get("sub") + + # check for matching group + group_claim_name = current_app.config.get( + "OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM", + OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM, + ) + group_names = token_user_info.get(group_claim_name) or user_info.get(group_claim_name) + filter_groups(remote, resp, group_names) + return { + "user": { + "active": True, + "email": email, + "profile": { + "full_name": full_name, + "username": username, + }, + }, + "external_id": cilogonid, + "external_method": remote.name, + } + + +def info_handler(remote, resp): + """Retrieve remote account information for finding matching local users. + + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :returns: A dictionary with the user information. + """ + token_user_info, user_info = get_user_info(remote, resp) + handlers = current_oauthclient.signup_handlers[remote.name] + # `remote` param automatically injected via `make_handler` helper + return handlers["info_serializer"](resp, token_user_info, user_info) + +def group_serializer_handler(remote, resp, token_user_info, user_info=None, **kwargs): + """Retrieve remote account information for group for finding matching local groups. + + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :returns: A dictionary with the user information. + """ + user_info = user_info or {} # prevent errors when accessing None.get(...) + group_claim_name = current_app.config.get( + "OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM", + OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM, + ) + group_names = token_user_info.get(group_claim_name) or user_info.get(group_claim_name) + groups_dict_list = [] + # check for matching group + try: + matching_groups = filter_groups(remote, resp, group_names) + for group in matching_groups: + group_dict = { + "id" : group, + "name": group, + "description": "" + } + groups_dict_list.append(group_dict) + return groups_dict_list + + except OAuthCilogonRejectedAccountError as e: + current_app.logger.warning(e.message, exc_info=False) + return groups_dict_list + +def group_rest_serializer_handler(remote, resp, token_user_info, user_info=None, **kwargs): + """Retrieve remote account information for group for finding matching local groups. + + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :returns: A dictionary with the user information. + """ + user_info = user_info or {} # prevent errors when accessing None.get(...) + group_claim_name = current_app.config.get( + "OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM", + OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM, + ) + group_names = token_user_info.get(group_claim_name) or user_info.get(group_claim_name) + groups_dict_list = [] + # check for matching group + try: + matching_groups = filter_groups(remote, resp, group_names) + for group in matching_groups: + group_dict = { + "id" : group, + "name": group, + "description": "" + } + groups_dict_list.append(group_dict) + return groups_dict_list + + except OAuthCilogonRejectedAccountError as e: + current_app.logger.warning(e.message, exc_info=False) + return groups_dict_list + +def group_handler(remote, resp): + """Retrieve remote account information for finding matching local users. + + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :returns: A dictionary with the user information. + """ + token_user_info, user_info = get_user_info(remote, resp) + handlers = current_oauthclient.signup_handlers[remote.name] + # `remote` param automatically injected via `make_handler` helper + return handlers["groups_serializer"](resp, token_user_info, user_info) + + +def setup_handler(remote, token, resp): + """Perform additional setup after the user has been logged in.""" + token_user_info, _ = get_user_info(remote, resp, from_token_only=True) + + with db.session.begin_nested(): + # fetch the user's cilogon ID (sub) and set it in extra_data + cilogonid = token_user_info["sub"] + token.remote_account.extra_data = { + "cilogonid": cilogonid, + } + + user = token.remote_account.user + external_id = {"id": cilogonid, "method": remote.name} + + group_claim_name = current_app.config.get( + "OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM", + OAUTHCLIENT_CILOGON_GROUP_OIDC_CLAIM, + ) + group_names = token_user_info.get(group_claim_name) + roles = get_groups(remote, resp, token.remote_account, group_names) + assert not isinstance(g.identity, AnonymousIdentity) + extend_identity(g.identity, roles) + + # link account with external cilogon ID + oauth_link_external_id(user, external_id) + +@require_more_than_one_external_account +def _disconnect(remote, *args, **kwargs): + """Common logic for handling disconnection of remote accounts.""" + if not current_user.is_authenticated: + return current_app.login_manager.unauthorized() + + account = RemoteAccount.get( + user_id=current_user.get_id(), client_id=remote.consumer_key + ) + + cilogonid = account.extra_data.get("cilogonid") + + if cilogonid: + external_id = {"id": cilogonid, "method": remote.name} + + oauth_unlink_external_id(external_id) + + if account: + with db.session.begin_nested(): + account.delete() + disconnect_identity(g.identity) + +def disconnect_handler(remote, *args, **kwargs): + """Handle unlinking of the remote account.""" + _disconnect(remote, *args, **kwargs) + return redirect(url_for("invenio_oauthclient_settings.index")) + +def disconnect_rest_handler(remote, *args, **kwargs): + """Handle unlinking of the remote account.""" + _disconnect(remote, *args, **kwargs) + rconfig = current_app.config["OAUTHCLIENT_REST_REMOTE_APPS"][remote.name] + redirect_url = rconfig["disconnect_redirect_url"] + return response_handler(remote, redirect_url) diff --git a/invenio_oauthclient/contrib/cilogon/helpers.py b/invenio_oauthclient/contrib/cilogon/helpers.py new file mode 100644 index 00000000..98c69c19 --- /dev/null +++ b/invenio_oauthclient/contrib/cilogon/helpers.py @@ -0,0 +1,179 @@ +"""Helper functions for the endpoint handlers.""" + +import re +import base64 +import jwt +import six +import struct +from datetime import datetime, timezone + +from flask import current_app + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers +from cryptography.hazmat.backends import default_backend +from invenio_oauthclient.errors import OAuthCilogonRejectedAccountError + +from ...errors import OAuthError + +AZ_09_DASHES_UNDERSCORES = r"^[A-Za-z0-9_-]+$" + +def is_app_name_valid(app_name): + """Validate app name.""" + return re.match(AZ_09_DASHES_UNDERSCORES, app_name) is not None + +def _generate_config_prefix(remote): + """Validate the app name so that it can be used in config vars.""" + app_name = remote.name + if not is_app_name_valid(app_name): + raise OAuthError( + f"Invalid app name {app_name}. " + "It should only contain letters, numbers, dashes " + "and underscores", + remote, + ) + return f"OAUTHCLIENT_{app_name.upper()}" + +def jwks2pem(jwks): + def intarr2long(arr): + return int(''.join(["%02x" % byte for byte in arr]), 16) + + def base64_to_long(data): + if isinstance(data, six.text_type): + data = data.encode("ascii") + # urlsafe_b64decode will happily convert b64encoded data + _d = base64.urlsafe_b64decode(bytes(data) + b'==') + return intarr2long(struct.unpack('%sB' % len(_d), _d)) + + pems = {} + for jwk in jwks['keys']: + alg = jwk['alg'] + exponent = base64_to_long(jwk['e']) + modulus = base64_to_long(jwk['n']) + numbers = RSAPublicNumbers(exponent, modulus) + public_key = numbers.public_key(backend=default_backend()) + pem = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + pems[alg] = pem + return pems + +def get_all_keys(remote, jwsurl): + keys = remote.get(jwsurl).data + try: + return jwks2pem(keys) + except Exception as e: + return {} + +def _get_user_info_from_token(remote, token, config_prefix): + """Get the user information from the JWT token.""" + jwsurl = current_app.config.get(f"{config_prefix}_JWKS_URL") + pubkeys = get_all_keys(remote=remote, jwsurl=jwsurl) + # pubkey = _format_public_key(get_public_key(remote)) + alg = jwt.get_unverified_header(token)["alg"] + try: + # pubkey = _format_public_key(pubkeys[alg]) + #pubkey = _format_public_key(pubkeys[alg]) + pubkey = pubkeys[alg] + except Exception as e: + return None + + + should_verify_aud = current_app.config.get(f"{config_prefix}_VERIFY_AUD", False) + expected_aud = current_app.config.get(f"{config_prefix}_AUD", None) + + should_verify_expiration = current_app.config.get( + f"{config_prefix}_VERIFY_EXP", False + ) + + options = { + # check signature expiration + "verify_exp": should_verify_expiration, + # check the target audience + "verify_aud": should_verify_aud and (expected_aud is not None), + 'verify_signature':True + } + + decodedToken = jwt.decode( + token, key=pubkey, algorithms=[alg], audience=expected_aud, options=options, verify=False, + ) + return decodedToken + +def _get_user_info_from_endpoint(remote, config_prefix): + """Get the user info from the oauth server provider.""" + url = current_app.config[f"{config_prefix}_USER_INFO_URL"] + return remote.get(url).data + + +def get_user_info(remote, resp_token, from_token_only=False): + """Get the user information from Comanage. + + :param remote: The OAuthClient remote app + :param resp_token: The response from the 'token' endpoint; expected to be a dict + and to contain a JWT 'id_token' + :param from_token_only: return info only from the token, without calling the + user info endpoint. + :returns: A tuple containing the user information extracted from the token, and + if configured, from the UserInfo endpoint + """ + config_prefix = _generate_config_prefix(remote) + from_token, from_endpoint = {}, None + try: + from_token = _get_user_info_from_token( + remote, resp_token["id_token"], config_prefix + ) + except Exception as e: + current_app.logger.exception(e) + + + call_endpoint = current_app.config[f"{config_prefix}_USER_INFO_URL"] + if not from_token_only and call_endpoint: + from_endpoint = _get_user_info_from_endpoint(remote, config_prefix) + + return from_token, from_endpoint + +def filter_groups(remote, resp, groups): + """ Filter groups from local _Allowed_ROLES. + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :param groups: List of groups to filter from _ALLOWED_ROLES + :retruns: A List of matching groups. + """ + config_prefix = _generate_config_prefix(remote) + allow_any_groups = current_app.config.get(f"{config_prefix}_ALLOW_ANY_ROLES", False) + if allow_any_groups: + return [] + valid_roles = current_app.config[f"{config_prefix}_ALLOWED_ROLES"] + matching_groups = [group for group in groups if group in valid_roles] + if not matching_groups: + # Return an error if no matching groups are found + raise OAuthCilogonRejectedAccountError( + "User roles/groups {0} are not one of allowed {1} roles/groups.".format(str(groups), str(valid_roles)), + remote, + { + "status_code": 401, + "error": { + "type": "OAuthCilogonRejectedAccountError", + "message": "User roles/groups {0} are not one of allowed {1} roles/groups.".format(str(groups), str(valid_roles)), + "details": { + "roles_provided": groups, + "valid_roles": valid_roles + } + } + } + ) + return matching_groups + +def get_groups(remote, resp, account, group_names): + """ Get groups from filter_groups and add as account extra data. + :param remote: The remote application. + :param resp: The response of the `authorized` endpoint. + :param account: The remote application. + :param group_names: List of group names to filter from _ALLOWED_ROLES. + :returns: A list of matching groups. + """ + roles = filter_groups(remote, resp, group_names) + updated = datetime.now(timezone.utc) + account.extra_data.update(roles=roles, updated=updated.isoformat()) + return roles diff --git a/invenio_oauthclient/contrib/cilogon/settings.py b/invenio_oauthclient/contrib/cilogon/settings.py new file mode 100644 index 00000000..261f4299 --- /dev/null +++ b/invenio_oauthclient/contrib/cilogon/settings.py @@ -0,0 +1,111 @@ +import requests +from invenio_oauthclient.contrib.settings import OAuthSettingsHelper + + +class CilogonSettingsHelper(OAuthSettingsHelper): + + def __init__(self, title=None, + description=None, + base_url=None, + app_key=None, + icon=None, + access_token_url=None, + authorize_url=None, + access_token_method="POST", + request_token_params=None, + request_token_url=None, + precedence_mask=None, + signup_options={"auto_confirm": True, "send_register_msg": False,}, + jwks_url=None, + logout_url=None): + endpoints = self.getEndpoints(base_url) + self._user_info_url = endpoints['user_info_url'] + self._jwks_url = endpoints['jwks_url'] + super().__init__(title=title, + description=description or "CILOGON Comanage Registr", + base_url=base_url or "https://cilogon.org/jlab", + app_key=app_key or "CILOGON_APP_CREDENTIALS", + icon=icon, + access_token_url=access_token_url or endpoints['access_token_url'], + authorize_url=authorize_url or endpoints['authorize_url'], + access_token_method=access_token_method or "POST", + request_token_params=request_token_params or {"scope": "openid email org.cilogon.userinfo profile "}, + request_token_url=request_token_url, + precedence_mask=precedence_mask, + signup_options=signup_options, + ) + + self._handlers = dict( + authorized_handler="invenio_oauthclient.handlers:authorized_signup_handler", + disconnect_handler="invenio_oauthclient.contrib.cilogon.handlers:disconnect_handler", + signup_handler=dict( + info="invenio_oauthclient.contrib.cilogon.handlers:info_handler", + info_serializer="invenio_oauthclient.contrib.cilogon.handlers:info_serializer_handler", + setup="invenio_oauthclient.contrib.cilogon.handlers:setup_handler", + groups="invenio_oauthclient.contrib.cilogon.handlers:group_handler", + groups_serializer="invenio_oauthclient.contrib.cilogon.handlers:group_serializer_handler", + view="invenio_oauthclient.handlers:signup_handler", + ), + ) + self._rest_handlers = dict( + authorized_handler="invenio_oauthclient.handlers.rest:authorized_signup_handler", + disconnect_handler="invenio_oauthclient.contrib.cilogon.handlers:disconnect_rest_handler", + signup_handler=dict( + info="invenio_oauthclient.contrib.cilogon.handlers:info_handler", + info_serializer="invenio_oauthclient.contrib.cilogon.handlers:info_serializer_handler", + setup="invenio_oauthclient.contrib.cilogon.handlers:setup_handler", + groups="invenio_oauthclient.contrib.cilogon.handlers:group_handler", + groups_serializer="invenio_oauthclient.contrib.cilogon.handlers:group_rest_serializer_handler", + view="invenio_oauthclient.handlers.rest:signup_handler", + ), + response_handler=( + "invenio_oauthclient.handlers.rest:default_remote_response_handler" + ), + authorized_redirect_url="/", + disconnect_redirect_url="/", + signup_redirect_url="/", + error_redirect_url="/", + ) + + def getEndpoints(self,base_url): + url = base_url.rstrip('/')+'/.well-known/openid-configuration' + try: + r = requests.get(url=url,timeout=4,headers={'Content-Type':'application/json'}) + if r.status_code == 200: + endpoints = r.json() + return {'access_token_url':endpoints['token_endpoint'], + 'authorize_url':endpoints['authorization_endpoint'], + 'user_info_url':endpoints['userinfo_endpoint'], + 'jwks_url':endpoints['jwks_uri'] + } + else: + return {'access_token_url':None, + 'authorize_url':None, + 'user_info_url':None, + 'jwks_url':None + } + except Exception: + return {'access_token_url':None, + 'authorize_url':None, + 'request_token_url':None, + 'user_info_url':None, + 'jwks_url':None + } + + @property + def user_info_url(self): + """URL for the user info endpoint.""" + return self._user_info_url + + @property + def jwks_url(self): + """URL for the jwks info endpoint""" + return self._jwks_url + + def get_handlers(self): + """Return a dict with the auth handlers.""" + return self._handlers + + def get_rest_handlers(self): + """Return a dict with the auth REST handlers.""" + return self._rest_handlers diff --git a/invenio_oauthclient/errors.py b/invenio_oauthclient/errors.py index 22cddb5f..c8d408a1 100644 --- a/invenio_oauthclient/errors.py +++ b/invenio_oauthclient/errors.py @@ -73,6 +73,8 @@ class OAuthCERNRejectedAccountError(OAuthResponseError): class OAuthKeycloakUserInfoError(OAuthResponseError): """Define exception for problems while fetching user info from Keycloak.""" +class OAuthCilogonRejectedAccountError(OAuthResponseError): + """Define exception for not allowed cern accounts.""" class OAuthClientUnAuthorized(Exception): """Define exception for unauthorized user.""" diff --git a/invenio_oauthclient/handlers/rest.py b/invenio_oauthclient/handlers/rest.py index 08ba5f98..f6e31d3c 100644 --- a/invenio_oauthclient/handlers/rest.py +++ b/invenio_oauthclient/handlers/rest.py @@ -37,6 +37,7 @@ OAuthClientUserRequiresConfirmation, OAuthError, OAuthRejectedRequestError, + OAuthCilogonRejectedAccountError, ) from ..proxies import current_oauthclient from ..utils import create_csrf_disabled_registrationform, fill_form @@ -143,6 +144,19 @@ def _oauth_error_handler(remote, f, *args, **kwargs): remote_app=remote.name, ) ) + except OAuthCilogonRejectedAccountError as e: + error_message = e.message + return response_handler( + remote, + current_app.config.get( + "OAUTHCLIENT_CILOGON_ROLES_ERROR_URL", + "/", + ), + payload=dict( + message=error_message, + code=401, + ), + ) # diff --git a/invenio_oauthclient/handlers/ui.py b/invenio_oauthclient/handlers/ui.py index cf09b85b..1f8b0f98 100644 --- a/invenio_oauthclient/handlers/ui.py +++ b/invenio_oauthclient/handlers/ui.py @@ -37,6 +37,7 @@ OAuthClientUserRequiresConfirmation, OAuthError, OAuthRejectedRequestError, + OAuthCilogonRejectedAccountError, ) from ..utils import create_registrationform, fill_form from .authorized import authorized_handler, extra_signup_handler @@ -85,6 +86,14 @@ def _oauth_error_handler(remote, f, *args, **kwargs): remote_app=remote.name, ) ) + except OAuthCilogonRejectedAccountError as e: + error_message = e.message + flash(_(error_message), category="danger") + redirect_url = current_app.config.get( + "OAUTHCLIENT_CILOGON_ROLES_ERROR_URL", + "/", + ) + return redirect(redirect_url) # diff --git a/tests/conftest.py b/tests/conftest.py index 896adcea..e40c6530 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,6 +38,7 @@ from invenio_oauthclient.contrib.globus import REMOTE_APP as GLOBUS_REMOTE_APP from invenio_oauthclient.contrib.globus import REMOTE_REST_APP as GLOBUS_REMOTE_REST_APP from invenio_oauthclient.contrib.keycloak import KeycloakSettingsHelper +from invenio_oauthclient.contrib.cilogon import CilogonSettingsHelper from invenio_oauthclient.contrib.orcid import REMOTE_APP as ORCID_REMOTE_APP from invenio_oauthclient.contrib.orcid import REMOTE_REST_APP as ORCID_REMOTE_REST_APP from invenio_oauthclient.utils import _create_registrationform @@ -67,6 +68,11 @@ def base_app(request): ) KEYCLOAK_REMOTE_APP = helper.remote_app + helper2 = CilogonSettingsHelper( + title="CILOGON", description="CILOGON", + base_url="http://localhost:8080") + CILOGON_REMOTE_APP = helper2.remote_app + instance_path = tempfile.mkdtemp() base_app = Flask("testapp") base_app.config.update( @@ -83,6 +89,7 @@ def base_app(request): github=GITHUB_REMOTE_APP, globus=GLOBUS_REMOTE_APP, keycloak=KEYCLOAK_REMOTE_APP, + cilogon=CILOGON_REMOTE_APP, ), OAUTHCLIENT_REST_REMOTE_APPS=dict( cern_openid=CERN_OPENID_REMOTE_REST_APP, @@ -120,6 +127,18 @@ def base_app(request): consumer_key="keycloak_key_changeme", consumer_secret="keycloak_secret_changeme", ), + + OAUTHCLIENT_CILOGON_USER_INFO_URL=helper2.user_info_url, + OAUTHCLIENT_CILOGON_JWKS_URL= helper2.jwks_url, + OAUTHCLIENT_CILOGON_VERIFY_AUD=True, + OAUTHCLIENT_CILOGON_VERIFY_EXP=False, + OAUTHCLIENT_CILOGON_AUD="cilogon:/client_id/14f788b154a4becc091a1b1b8274aaa3", + OAUTHCLIENT_CILOGON_ALLOWED_ROLES=["CO:COU:eic:members:all"], + + CILOGON_APP_CREDENTIALS=dict( + consumer_key="cilogon_key_changeme", + consumer_secret="cilogon_secret_changeme", + ), # use local memory mailbox EMAIL_BACKEND="flask_email.backends.locmem.Mail", SQLALCHEMY_DATABASE_URI=os.getenv("SQLALCHEMY_DATABASE_URI", "sqlite://"), @@ -579,3 +598,41 @@ def example_keycloak_public_key(): "FZ+3rJGEbEKFUbFNPTJfslXh+mnH89/ZM8mZDb4V8YNX1lafSeJdvC7nnvvyQIDAQ" "AB" ) + + +@pytest.fixture() +def example_cilogon_token(): + """Keycloak example data.""" + file_path = os.path.join( + os.path.dirname(__file__), "data/cilogon_token_response.json" + ) + + with open(file_path) as token_file: + token = json.load(token_file) + + return token + + +@pytest.fixture() +def example_cilogon_userinfo(): + """Keycloak example user info response.""" + file_path = os.path.join( + os.path.dirname(__file__), "data/cilogon_userinfo_response.json" + ) + + with open(file_path) as response_file: + response = json.load(response_file) + return response + + +@pytest.fixture() +def example_jwks_info(): + """Keycloak example JWKs info.""" + file_path = os.path.join( + os.path.dirname(__file__), "data/cilogon_jwks_info.json" + ) + + with open(file_path) as info_file: + jwks_info = json.load(info_file) + + return jwks_info diff --git a/tests/data/cilogon_jwks_info.json b/tests/data/cilogon_jwks_info.json new file mode 100644 index 00000000..4316f272 --- /dev/null +++ b/tests/data/cilogon_jwks_info.json @@ -0,0 +1,28 @@ +{ + "keys": [ + { + "alg": "RS256", + "kid": "8CBFDDCC7F9BAD7D7F1A6031DE78BC53", + "use": "sig", + "kty": "RSA", + "n": "AMUUEQlLCuRd91dLM4LU1LTlCmK9O11eDJDc8q7hQB26Jgu7syQlZ3BIjEgN9i5oQKe4aQ_lQBoWO4nR1x2Q75J2qwzArIBlLL0poP_MLz1xRHwctL5p-pGIs_Mujn9o7MzHELbd6k-4JSbup21klHQnZNqkRJg1pbqXLiL7kTAt8yFxsX5JnBbfeXU1aTZx99yOKsQT2xSKUAPT4bFZUO9xvNXu0Q7zq79M6UjAWsshrVuswZNyVezWHn1Qi4POo7YWz50kQea4HJ7SCrYa3cf2QoCsOAEWvmPgnik_boCEy2a013FOsx_RXB-P4XKQaWlVqpRpGVlrlqhoPgo-5w0", + "e": "AQAB" + }, + { + "alg": "RS512", + "kid": "DA262065998891A2B6186CAB9DA71381", + "use": "sig", + "kty": "RSA", + "n": "AMgs0ALUygLbP33AnCXvV2nXwO-ZVwV-7flN8QmKZrexYWLeQHH-LzKy_aFE7_utNRd910_V1QQnlmjVHetwi0JLEOhlZIPa2fiqezfNP1GLky5TABW9hgRTRL8BQC_vqZTCBe4ehQd0zKsDne9-HjNpla717XPl1Itjbg0D0iKsL4iVJNT_7nhNx08xVsiMe-H0FTKtAE3-SIzdhcIoARBKoLnTP4WtARRI01HpiCkUojE4I92x147MqrY9O77X6QMITr_Y6Z26T9rWtLwp_fL6Q2TQXCW0jFJ86dQJMHW70shFMUzH-yXvtUDgu7EPNl8jWFq7nRLIyd4fSiaCeq0", + "e": "AQAB" + }, + { + "alg": "RS384", + "kid": "DCCA93F1C7CFE5DE2BD73DCFAD7BC384", + "use": "sig", + "kty": "RSA", + "n": "AKjUXl9PLCrY2JHNhNHsUxUQSqxbli5_XlquKAnxXJGI73uwOtrpLIwW_1SPpbOcKHGc-gqxhtGRcoUP_xM0lVo9VD9El-1nNGG6BsTJksYJgjgYw1qLHzuuRYK1WULX2OQ1r1yZPqZ9d1vtGXkv872JPD1Tnt_86FQ1dZZz5qhZqUV8vikLVa0ehVeoty1m0Rbqi4-8fCaQHg-mR06ImL4Vk8B3lQlkRikMivHAbc8ZtwT7OdO24ueEqo6bADtwe8IURpRDDFsk_MQA1jncwHuHkb7FuBpifD8HA3yOxD-ZWckhOGkla35GoFuHx1FzL6ZxrhTABB5mAos3c1M7DAU", + "e": "AQAB" + } + ] +} \ No newline at end of file diff --git a/tests/data/cilogon_token_response.json b/tests/data/cilogon_token_response.json new file mode 100644 index 00000000..3705bf93 --- /dev/null +++ b/tests/data/cilogon_token_response.json @@ -0,0 +1,6 @@ +{ + "access_token": "NB2HI4DTHIXS6Y3JNRXWO33OFZXXEZZPN5QXK5DIGIXTONRRHA4TIOJZGIZTKMLGMQ3WMOLDGVSGINDGMVRDEZLEGY4GCMR7OR4XAZJ5MFRWGZLTONKG623FNYTHI4Z5GE3TENJZGA4TGMRWGE4DGJTWMVZHG2LPNY6XMMROGATGY2LGMV2GS3LFHU4TAMBQGAYA", + "id_token": "eyJraWQiOiI4Q0JGRERDQzdGOUJBRDdEN0YxQTYwMzFERTc4QkM1MyIsInR5cCI6IkpXVCIsImFsZyI6IlJTMjU2In0.eyJzdWIiOiJodHRwOi8vY2lsb2dvbi5vcmcvc2VydmVyRS91c2Vycy8xMDg1MDYiLCJpZHBfbmFtZSI6IlRob21hcyBKZWZmZXJzb24gTmF0aW9uYWwgQWNjZWxlcmF0b3IgRmFjaWxpdHkiLCJlcHBuIjoicGFudGFAamxhYi5vcmciLCJvdSI6IlBBRE1JTiIsImNlcnRfc3ViamVjdF9kbiI6Ii9EQz1vcmcvREM9Y2lsb2dvbi9DPVVTL089VGhvbWFzIEplZmZlcnNvbiBOYXRpb25hbCBBY2NlbGVyYXRvciBGYWNpbGl0eS9DTj1BbmlsIFBhbnRhIEUxMDg1MDYiLCJlcHRpZCI6Imh0dHBzOi8vamlkcC5qbGFiLm9yZy9pZHAvc2hpYmJvbGV0aCFodHRwczovL2NpbG9nb24ub3JnL3NoaWJib2xldGghcGFudGFAamxhYi5vcmciLCJpc3MiOiJodHRwczovL2NpbG9nb24ub3JnL2psYWIiLCJnaXZlbl9uYW1lIjoiQW5pbCIsImVkdVBlcnNvbkFzc3VyYW5jZSI6WyJodHRwczovL3JlZmVkcy5vcmcvYXNzdXJhbmNlIiwiaHR0cHM6Ly9yZWZlZHMub3JnL2Fzc3VyYW5jZS9BVFAvZVBBLTFkIiwiaHR0cHM6Ly9yZWZlZHMub3JnL2Fzc3VyYW5jZS9BVFAvZVBBLTFtIiwiaHR0cHM6Ly9yZWZlZHMub3JnL2Fzc3VyYW5jZS9JRC91bmlxdWUiLCJodHRwczovL3JlZmVkcy5vcmcvYXNzdXJhbmNlL0lEL2VwcG4tdW5pcXVlLW5vLXJlYXNzaWduIiwiaHR0cHM6Ly9yZWZlZHMub3JnL2Fzc3VyYW5jZS9JQVAvbG9jYWwtZW50ZXJwcmlzZSIsImh0dHBzOi8vcmVmZWRzLm9yZy9hc3N1cmFuY2UvSUFQL21lZGl1bSIsImh0dHBzOi8vcmVmZWRzLm9yZy9hc3N1cmFuY2UvcHJvZmlsZS9jYXBwdWNjaW5vIl0sImFjciI6InVybjpvYXNpczpuYW1lczp0YzpTQU1MOjIuMDphYzpjbGFzc2VzOlBhc3N3b3JkUHJvdGVjdGVkVHJhbnNwb3J0IiwiYXVkIjoiY2lsb2dvbjovY2xpZW50X2lkLzE0Zjc4OGIxNTRhNGJlY2MwOTFhMWIxYjgyNzRhYWEzIiwibmJmIjoxNzI1OTA5MzI2LCJpZHAiOiJodHRwczovL2ppZHAuamxhYi5vcmcvaWRwL3NoaWJib2xldGgiLCJhZmZpbGlhdGlvbiI6Im1lbWJlckBqbGFiLm9yZztlbXBsb3llZUBqbGFiLm9yZyIsImF1dGhfdGltZSI6MTcyNTg4ODc1MSwibmFtZSI6IkFuaWwgUGFudGEiLCJpc01lbWJlck9mIjpbIkNPOm1lbWJlcnM6YWxsIiwiQ086Q09VOmVpYzptZW1iZXJzOmFsbCIsIkNPOm1lbWJlcnM6YWN0aXZlIiwiQ086Q09VOmVpYzptZW1iZXJzOmFjdGl2ZSIsImVpYyBjb21wdXRpbmciXSwiZXhwIjoxNzI1OTEwMjI2LCJmYW1pbHlfbmFtZSI6IlBhbnRhIiwiaWF0IjoxNzI1OTA5MzI2LCJlbWFpbCI6InBhbnRhQGpsYWIub3JnIiwianRpIjoiaHR0cHM6Ly9jaWxvZ29uLm9yZy9vYXV0aDIvaWRUb2tlbi8xYzJlYmFiOTRkODdjOWVlNWIxMzc0NDE1MjRjMjZjNC8xNzI1OTA5MzI1OTY0In0.NOwHK9RGkEYI5TnHB0_A-vBwzgvf2YuXnj66q0DSsA8Ku8BoIMne3HW3oA8vWGUCxDNcYBVA95QR9h3PfyoFvZY9JdGnoKxvKz2_8nZwmPPYwfptIg7N0qTWB8UR6f5_h94drwForMZBJQkoEtUokKTzYHP63AlKUY57zxT5ObPUs0jrE4BrHBGZ44Km53T0JTlIkzIMyeNS8bRJBeob4pOud8ZHDoXcuTNDK1pCVwxTfXa3_R_BWQF5XhDhE2e0iHtZ9Et-JXj9rdR15loF7sCys5xo-6gR_4-z6cmkuQ8KnD3ftCYlIvfz9IFid3caIzAqCctwQkTa8aJwk3xSug", + "token_type": "Bearer", + "expires_in": 900 +} \ No newline at end of file diff --git a/tests/data/cilogon_userinfo_response.json b/tests/data/cilogon_userinfo_response.json new file mode 100644 index 00000000..51b873ce --- /dev/null +++ b/tests/data/cilogon_userinfo_response.json @@ -0,0 +1,36 @@ +{ + "sub": "http://cilogon.org/serverE/users/108506", + "idp_name": "Thomas Jefferson National Accelerator Facility", + "eppn": "panta@jlab.org", + "ou": "PADMIN", + "cert_subject_dn": "/DC=org/DC=cilogon/C=US/O=Thomas Jefferson National Accelerator Facility/CN=Anil Panta E108506", + "eptid": "https://jidp.jlab.org/idp/shibboleth!https://cilogon.org/shibboleth!panta@jlab.org", + "iss": "https://cilogon.org/jlab", + "given_name": "Anil", + "eduPersonAssurance": [ + "https://refeds.org/assurance", + "https://refeds.org/assurance/ATP/ePA-1d", + "https://refeds.org/assurance/ATP/ePA-1m", + "https://refeds.org/assurance/ID/unique", + "https://refeds.org/assurance/ID/eppn-unique-no-reassign", + "https://refeds.org/assurance/IAP/local-enterprise", + "https://refeds.org/assurance/IAP/medium", + "https://refeds.org/assurance/profile/cappuccino" + ], + "acr": "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport", + "aud": "cilogon:/client_id/14f788b154a4becc091a1b1b8274aaa3", + "nbf": 1725903786, + "idp": "https://jidp.jlab.org/idp/shibboleth", + "affiliation": "member@jlab.org;employee@jlab.org", + "name": "Anil Panta", + "isMemberOf": [ + "CO:members:all", + "CO:COU:eic:members:all", + "CO:members:active", + "CO:COU:eic:members:active", + "eic computing" + ], + "family_name": "Panta", + "email": "panta@jlab.org", + "jti": "https://cilogon.org/oauth2/idToken/3ba13fe053d619ffc0e539c522cd961c/1725903786081" +} \ No newline at end of file diff --git a/tests/helpers.py b/tests/helpers.py index 7407c45b..388b2854 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -11,12 +11,14 @@ import json from inspect import isfunction from urllib.parse import parse_qs, urlencode, urlparse +from unittest.mock import patch import httpretty from mock import MagicMock from invenio_oauthclient._compat import _create_identifier from invenio_oauthclient.views.client import serializer +from invenio_oauthclient.contrib.cilogon.helpers import jwks2pem def get_state(app="test"): @@ -87,3 +89,37 @@ def mock_keycloak(app_config, token_dict, user_info_dict, realm_info): body=json.dumps(realm_info), content_type="application/json", ) + +def mock_cilogon(app_config, token_dict, user_info_dict, jwks_info): + """Mock a running CiLogon instance.""" + cilogon_settings = app_config["OAUTHCLIENT_REMOTE_APPS"]["cilogon"] + httpretty.register_uri( + httpretty.POST, + cilogon_settings["params"]["access_token_url"], + body=json.dumps(token_dict), + content_type="application/json", + ) + + httpretty.register_uri( + httpretty.GET, + app_config["OAUTHCLIENT_CILOGON_USER_INFO_URL"], + body=json.dumps(user_info_dict), + content_type="application/json", + ) + + httpretty.register_uri( + httpretty.GET, + app_config["OAUTHCLIENT_CILOGON_JWKS_URL"], # Make sure this key exists in your settings + body=json.dumps(jwks_info), + content_type="application/json", + ) + + def mock_get_all_keys(remote, jwsurl): + return jwks2pem(jwks_info) + + # Patch the get_all_keys function + patcher = patch('invenio_oauthclient.contrib.cilogon.helpers.get_all_keys', + side_effect=mock_get_all_keys) + patcher.start() + + return patcher diff --git a/tests/test_contrib_cilogon.py b/tests/test_contrib_cilogon.py new file mode 100644 index 00000000..54695797 --- /dev/null +++ b/tests/test_contrib_cilogon.py @@ -0,0 +1,416 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) +# 2024 BNL. +# 2024 JLab +# Invenio-cilogon is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. +# This test was taken from Keycloak test and modified to test cilogon. + +"""Tests for the cilogon OAuth remote_app.""" + +from urllib.parse import parse_qs, urlparse + +import httpretty +import jwt +import pytest +from flask import session, url_for +from flask_login import current_user, login_user +from flask_security.utils import hash_password +from helpers import get_state, mock_cilogon +from invenio_accounts.models import Role, User +from invenio_db import db + +from invenio_oauthclient.contrib.cilogon.helpers import get_all_keys, get_user_info +from invenio_oauthclient.errors import OAuthError +from invenio_oauthclient.handlers import token_session_key +from invenio_oauthclient.models import UserIdentity + +# - - - - - - - - - - - - - -# +# Tests for Cilogon contrib # +# - - - - - - - - - - - - - -# + + +def test_login(app): + """Test cilogon login.""" + cilogon_config = app.config["OAUTHCLIENT_REMOTE_APPS"]["cilogon"] + auth_url = cilogon_config["params"]["authorize_url"] + + client = app.test_client() + + resp = client.get( + url_for("invenio_oauthclient.login", remote_app="cilogon", next="/someurl/") + ) + + assert resp.status_code == 302 + comps = urlparse(resp.location) + params = parse_qs(comps.query) + url = "{}://{}{}".format(comps.scheme, comps.netloc, comps.path) + + assert url == auth_url + assert params["response_type"] == ["code"] + assert params["scope"] == ["openid email org.cilogon.userinfo profile "] + assert params["redirect_uri"] + assert params["client_id"] + assert params["state"] + + +@httpretty.activate +def test_authorized_signup_valid_user( + app_with_userprofiles, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, +): + """Test authorized callback with sign-up.""" + app = app_with_userprofiles + example_cilogon = example_cilogon_userinfo + + with app.test_client() as c: + # ensure that remote_apps have been initialized (before first request) + resp = c.get(url_for("invenio_oauthclient.login", remote_app="cilogon")) + assert resp.status_code == 302 + + # mock a running cilogon instance + patcher = mock_cilogon( + app.config, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, + ) + # user authorized the request and is redirected back + resp = c.get( + url_for( + "invenio_oauthclient.authorized", + remote_app="cilogon", + code="test", + state=get_state("cilogon"), + ) + ) + + # note: because we provided an e-mail address in 'info_handler', + # the user does not need to sign up + assert resp.status_code == 302 + assert resp.location == ( + url_for("invenio_oauthclient.signup", remote_app="cilogon") + ) + + # User load sign-up page. + resp = c.get(url_for("invenio_oauthclient.signup", remote_app="cilogon")) + assert resp.status_code == 200 + account_info = session[token_session_key("cilogon") + "_account_info"] + account_info["user"]["profile"]["username"] = "panta" + data = { + "email": account_info["user"]["email"], + "profile.username": account_info["user"]["profile"]["username"], + "profile.full_name": account_info["user"]["profile"]["full_name"], + "profile.affiliations": "cern", + } + # User fills form to register + resp = c.post( + url_for("invenio_oauthclient.signup", remote_app="cilogon"), + data=data, + ) + + assert resp.status_code == 302 + httpretty.disable() + + # check that the user exists + user = User.query.filter_by(email=example_cilogon["email"]).one() + assert user is not None + assert user.email == example_cilogon["email"] + assert user.user_profile["full_name"] == "Anil Panta" + assert user.active + assert user.confirmed_at + # check that the user has a linked cilogon account + uid = UserIdentity.query.filter_by( + method="cilogon", id_user=user.id, id=example_cilogon["sub"] + ).one() + assert uid.user is user + + # Assert that the new group is created + # single group is allowed + roles = Role.query.all() + true_role = app.config["OAUTHCLIENT_CILOGON_ALLOWED_ROLES"] + assert len(roles) == len(true_role) + + # we set id as group name. + role = Role.query.filter(Role.id == true_role[0]).one() + assert role.id == true_role[0] + assert role.name == true_role[0] + + # try to disconnect the cilogon account + # which shouldn't work, because it's the user's only means of login + resp = c.get(url_for("invenio_oauthclient.disconnect", remote_app="cilogon")) + + assert resp.status_code == 400 + + # check that the user still exists + user = User.query.filter_by(email=example_cilogon["email"]).one() + assert user is not None + + # check that the cilogon account hasn't been unlinked + count = UserIdentity.query.filter_by( + method="cilogon", id_user=user.id, id=example_cilogon["sub"] + ).count() + assert count == 1 + + # set a password for the user + user.password = hash_password("1234") + db.session.commit() + + # try to disconnect the cilogon account again + resp = c.get(url_for("invenio_oauthclient.disconnect", remote_app="cilogon")) + + assert resp.status_code == 302 + + # check that the user still exists + user = User.query.filter_by(email=example_cilogon["email"]).one() + assert user is not None + + # check that the cilogon account hasn't been unlinked + count = UserIdentity.query.filter_by( + method="cilogon", id_user=user.id, id=example_cilogon["sub"] + ).count() + assert count == 0 + patcher.stop() + + +@httpretty.activate +def test_authorized_signup_valid_user_without_userprofile( + app, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, +): + """Test authorized callback with sign-up.""" + app = app + example_cilogon = example_cilogon_userinfo + + with app.test_client() as c: + # ensure that remote_apps have been initialized (before first request) + resp = c.get(url_for("invenio_oauthclient.login", remote_app="cilogon")) + assert resp.status_code == 302 + + # mock a running cilogon instance + patcher = mock_cilogon( + app.config, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, + ) + # user authorized the request and is redirected back + resp = c.get( + url_for( + "invenio_oauthclient.authorized", + remote_app="cilogon", + code="test", + state=get_state("cilogon"), + ) + ) + + assert resp.status_code == 302 + # check that the user exists + user = User.query.filter_by(email=example_cilogon["email"]).one() + assert user is not None + assert user.email == example_cilogon["email"] + assert user.active + assert user.confirmed_at + # check that the user has a linked cilogon account + uid = UserIdentity.query.filter_by( + method="cilogon", id_user=user.id, id=example_cilogon["sub"] + ).one() + assert uid.user is user + + # Assert that the new group is created + # single group is allowed + roles = Role.query.all() + true_role = app.config["OAUTHCLIENT_CILOGON_ALLOWED_ROLES"] + assert len(roles) == len(true_role) + patcher.stop() + + +@httpretty.activate +def test_authorized_signup_valid_allow_all_roles( + app, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, +): + """Test authorized callback with sign-up and allow all roles""" + app = app + + example_cilogon = example_cilogon_userinfo + + with app.test_client() as c: + # ensure that remote_apps have been initialized (before first request) + resp = c.get(url_for("invenio_oauthclient.login", remote_app="cilogon")) + assert resp.status_code == 302 + + # mock a running cilogon instance + patcher = mock_cilogon( + app.config, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, + ) + + # allowing this to be True means no role check is done and + # no group should be created + app.config["OAUTHCLIENT_CILOGON_ALLOW_ANY_ROLES"] = True + # user authorized the request and is redirected back + resp = c.get( + url_for( + "invenio_oauthclient.authorized", + remote_app="cilogon", + code="test", + state=get_state("cilogon"), + ) + ) + assert resp.status_code == 302 + roles = Role.query.all() + assert len(roles) == 0 + patcher.stop() + + +@httpretty.activate +def test_invalid_role_reject( + app, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, +): + """Test authorized callback with sign-up and allow all roles""" + app = app + + example_cilogon = example_cilogon_userinfo + + with app.test_client() as c: + # ensure that remote_apps have been initialized (before first request) + resp = c.get(url_for("invenio_oauthclient.login", remote_app="cilogon")) + assert resp.status_code == 302 + + # mock a running cilogon instance + patcher = mock_cilogon( + app.config, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, + ) + app.config["OAUTHCLIENT_CILOGON_ALLOWED_ROLES"] = ["random"] + resp = c.get( + url_for( + "invenio_oauthclient.authorized", + remote_app="cilogon", + code="test", + state=get_state("cilogon"), + ) + ) + assert resp.status_code in (301, 302) + assert resp.location == "/" + assert session["_flashes"][0][0] == "danger" + patcher.stop() + + +def test_authorized_reject(app, example_cilogon_token): + """Test a rejected request.""" + with app.test_client() as c: + c.get(url_for("invenio_oauthclient.login", remote_app="cilogon")) + + resp = c.get( + url_for( + "invenio_oauthclient.authorized", + remote_app="cilogon", + error="access_denied", + error_description="User denied access", + state=get_state("cilogon"), + ) + ) + assert resp.status_code in (301, 302) + assert resp.location == "/" + + # check message flash + assert session["_flashes"][0][0] == "info" + + +def test_not_authenticated(app): + """Test disconnect when the user is not authenticated.""" + with app.test_client() as c: + assert not current_user.is_authenticated + resp = c.get(url_for("invenio_oauthclient.disconnect", remote_app="cilogon")) + assert resp.status_code == 302 + + +@httpretty.activate +def test_authorized_already_authenticated( + app, + models_fixture, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, +): + """Test authorized callback with sign-in.""" + datastore = app.extensions["invenio-accounts"].datastore + login_manager = app.login_manager + + example_cilogon = example_cilogon_userinfo + existing_mail = "existing@inveniosoftware.org" + user = datastore.find_user(email=existing_mail) + + @login_manager.user_loader + def load_user(user_id): + return user + + @app.route("/logmein") + def login(): + login_user(user) + return "Logged in" + + with app.test_client() as c: + c.get("/logmein", follow_redirects=True) + + # ensure that remote apps have been loaded (before first request) + c.get(url_for("invenio_oauthclient.login", remote_app="cilogon")) + + # mock a running cilogon instance + mock_cilogon( + app.config, + example_cilogon_token, + example_cilogon_userinfo, + example_jwks_info, + ) + + # user goes to 'linked accounts' and clicks 'connect' with cilogon + resp = c.get( + url_for("invenio_oauthclient.login", remote_app="cilogon", next="/someurl/") + ) + + assert resp.status_code == 302 + + # the user logged in to cilogon and authorized the request + resp = c.get( + url_for( + "invenio_oauthclient.authorized", + remote_app="cilogon", + code="test", + state=get_state("cilogon"), + ) + ) + + # check if the cilogon account has been linked to the user + u = User.query.filter_by(email=existing_mail).one() + UserIdentity.query.filter_by( + method="cilogon", id_user=u.id, id=example_cilogon["sub"] + ).one() + + # let the user hit the 'disconnect' button + resp = c.get(url_for("invenio_oauthclient.disconnect", remote_app="cilogon")) + assert resp.status_code == 302 + + # check that the user still exists, + # but the cilogon account has been unlinked + u = User.query.filter_by(email=existing_mail).one() + count = UserIdentity.query.filter_by( + method="cilogon", id_user=u.id, id=example_cilogon["sub"] + ).count() + assert count == 0