Skip to content

Commit

Permalink
Support for OAuth2 sessions revocation
Browse files Browse the repository at this point in the history
Add support for revocation in OAuth2 sessions.
Make an example with GitLab.
Documentation.

Signed-off-by: Louis Rannou <[email protected]>
  • Loading branch information
Louson authored and Louis Rannou committed Mar 19, 2024
1 parent eee74a2 commit 58a2770
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
50 changes: 50 additions & 0 deletions docs/examples/gitlab.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
GitLab OAuth 2 Tutorial
==========================

Add a new application on `GitLab`_ (redirect URI can be `https://example.com`
and check the box `read_user`). When you have obtained a ``client_id`` and a
``client_secret`` you can try out the command line interactive example below.

.. _`GitLab`:
https://gitlab.com/-/user_settings/applications

.. code-block:: pycon
>>> # Credentials you get from registering a new application
>>> client_id = '<the id you get from github>'
>>> client_secret = '<the secret you get from github>'
>>> redirect_uri = '<the URI you gave>'
>>> scope = '<the scope you checked>'
>>> # OAuth endpoints given in the GitLab API documentation
>>> authorization_base_url = 'https://gitlab.com/oauth/authorize'
>>> token_url = 'https://gitlab.com/oauth/token'
>>> from requests_oauthlib import OAuth2Session
>>> gitlab = OAuth2Session(client_id, scope=scope, redirect_uri=redirect_uri)
>>> # Redirect user to GitLab for authorization
>>> authorization_url, state = gitlab.authorization_url(authorization_base_url)
>>> print('Please go here and authorize,', authorization_url)
>>> # Get the authorization verifier code from the callback url
>>> redirect_response = input('Paste the full redirect URL here:')
>>> # Fetch the access token
>>> gitlab.fetch_token(token_url, client_secret=client_secret,
>>> authorization_response=redirect_response)
>>> # Fetch a protected resource, i.e. user profile
>>> r = gitlab.get('https://gitlab.com/api/v4/users')
>>> print(r.content)
>>> # Refresh the token
>>> refresh_url = token_url # True for GitLab but not all providers.
>>> gitlab.refresh_token(refresh_url,
>>> client_id=client_id, client_secret=client_secret)
>>> # Revoke the token
>>> revoke_url = 'https://gitlab.com/oauth/revoke'
>>> gitlab.revoke_token(revoke_url,
>>> client_id=client_id, client_secret=client_secret)
30 changes: 30 additions & 0 deletions docs/oauth2_workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,36 @@ however that you still need to update ``expires_in`` to trigger the refresh.
... auto_refresh_kwargs=extra, token_updater=token_saver)
>>> r = oauth.get(protected_url)
Revoking tokens
---------------

Certain providers will provide a ``revoke`` API. It can be used to revoke the
access token or the refresh token.

.. code-block:: pycon
>>> token = {
... 'access_token': 'eswfld123kjhn1v5423',
... 'refresh_token': 'asdfkljh23490sdf',
... 'token_type': 'Bearer',
... 'expires_in': '-30', # initially 3600, need to be updated by you
... }
>>> client_id = r'foo'
>>> revoke_url = 'https://provider.com/revoke'
>>> # some providers will ask you for extra credentials to be passed along
>>> # when refreshing tokens, usually for authentication purposes.
>>> extra = {
... 'client_id': client_id,
... 'client_secret': r'potato',
... }
>>> from requests_oauthlib import OAuth2Session
>>> from oauthlib.oauth2 import TokenExpiredError
>>> oauth = OAuth2Session(client_id, token=token)
>>> oauth.revoke_token(revoke_url, **extra)
TLS Client Authentication
-------------------------

Expand Down
85 changes: 85 additions & 0 deletions requests_oauthlib/oauth2_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from oauthlib.oauth2 import WebApplicationClient, InsecureTransportError
from oauthlib.oauth2 import LegacyApplicationClient
from oauthlib.oauth2 import TokenExpiredError, is_secure_transport
from oauthlib.oauth2 import UnsupportedTokenTypeError
from oauthlib.oauth2 import TemporarilyUnavailableError, ServerError
import requests

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -98,8 +100,10 @@ def __init__(
self.compliance_hook = {
"access_token_response": set(),
"refresh_token_response": set(),
"revoke_token_response": set(),
"protected_request": set(),
"refresh_token_request": set(),
"revoke_token_request": set(),
"access_token_request": set(),
}

Expand Down Expand Up @@ -499,6 +503,85 @@ def refresh_token(
self.token["refresh_token"] = refresh_token
return self.token

def revoke_token(
self,
token_url,
token=None,
token_type=None,
body="",
auth=None,
timeout=None,
headers=None,
verify=None,
proxies=None,
**kwargs
):
"""Revoke a token pair using a token.
:param token_url: The token endpoint, must be HTTPS.
:param token: The token to revoke.
:param token_type: The type of token to revoke.
:param body: Optional application/x-www-form-urlencoded body to add the
include in the token request. Prefer kwargs over body.
:param auth: An auth tuple or method as accepted by `requests`.
:param timeout: Timeout of the request in seconds.
:param headers: A dict of headers to be used by `requests`.
:param verify: Verify SSL certificate.
:param proxies: The `proxies` argument will be passed to `requests`.
:param kwargs: Extra parameters to include in the token request.
:return: A token dict
"""
if not token_url:
raise ValueError("No token endpoint set for revoke.")

if not is_secure_transport(token_url):
raise InsecureTransportError()

token = token or self.token.get("access_token")
token_type = token_type or self.token.get("token_type")

_request_headers = headers or {}

if token_type:
(url, _headers, body) = self._client.prepare_token_revocation_request(
token_url, token, token_type, body=body, scope=self.scope, **kwargs)
else:
(url, _headers, body) = self._client.prepare_revocation_request(
token_url, token, body=body, scope=self.scope, **kwargs)
_request_headers.update(_headers)
log.debug("Prepared revocation request %s", body)

for hook in self.compliance_hook["revoke_token_request"]:
log.debug("Invoking revoke_token_request hook %s.", hook)
url, _request_headers, body = hook(url, _headers, body)

r = self.post(
url,
data=dict(urldecode(body)),
auth=auth,
timeout=timeout,
headers=_request_headers,
verify=verify,
withhold_token=True,
proxies=proxies,
)
log.debug("Request to revoke token completed with status %s.", r.status_code)
log.debug("Response headers were %s and content %s.", r.headers, r.text)
log.debug(
"Invoking %d token response hooks.",
len(self.compliance_hook["revoke_token_response"]),
)
for hook in self.compliance_hook["revoke_token_response"]:
log.debug("Invoking hook %s.", hook)
r = hook(r)

if not r.ok and r.status_code == 400:
if 'unsupported_token_type' in r.text:
raise UnsupportedTokenTypeError("Revocation not supported by server")
raise ServerError('Server error')
elif not r.ok and r.code == 503:
raise TemporarilyUnavailableError("Service unavailable")

def request(
self,
method,
Expand Down Expand Up @@ -573,9 +656,11 @@ def register_compliance_hook(self, hook_type, hook):
Available hooks are:
access_token_response invoked before token parsing.
refresh_token_response invoked before refresh token parsing.
revoke_token_response invoked after token revocation.
protected_request invoked before making a request.
access_token_request invoked before making a token fetch request.
refresh_token_request invoked before making a refresh request.
revoke_token_request invoked before making a revoke request.
If you find a new hook is needed please send a GitHub PR request
or open an issue.
Expand Down

0 comments on commit 58a2770

Please sign in to comment.