Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/flask_pyoidc/flask_pyoidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,10 @@ def valid_access_token(self, force_refresh=False):
return None

client = self.clients[session.current_provider]
response = client.refresh_token(session.refresh_token)
extra_token_args = {}
if 'OIDC_CLOCK_SKEW' in current_app.config:
extra_token_args['skew'] = current_app.config['OIDC_CLOCK_SKEW']
response = client.refresh_token(session.refresh_token, **extra_token_args)
if 'error' in response:
logger.info('failed to refresh access token: ' + json.dumps(response.to_dict()))
return None
Expand Down
7 changes: 5 additions & 2 deletions src/flask_pyoidc/pyoidc_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,15 @@ def verify_id_token(self, id_token, auth_request):
"""
self._client.verify_id_token(id_token, auth_request)

def refresh_token(self, refresh_token: str):
def refresh_token(self, refresh_token: str, **kwargs):
"""Requests new tokens using a refresh token.

Parameters
----------
refresh_token: str
refresh token issued to client after user authorization.
**kwargs : dict, optional
Extra arguments to pass to pyoidc (e.g., skew for clock skew tolerance).

Returns
-------
Expand All @@ -195,7 +197,8 @@ def refresh_token(self, refresh_token: str):
return self._client.do_access_token_refresh(request_args=request_args,
authn_method=client_auth_method,
token=Token(resp={'refresh_token': refresh_token}),
endpoint=self._client.token_endpoint
endpoint=self._client.token_endpoint,
**kwargs
)

def userinfo_request(self, access_token: str):
Expand Down
54 changes: 54 additions & 0 deletions tests/test_flask_pyoidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,60 @@ def test_should_refresh_still_valid_access_token_if_forced(self):
assert session.access_token == token_response['access_token']
assert session.refresh_token == token_response['refresh_token']

@patch('time.time')
@patch('oic.utils.time_util.utc_time_sans_frac')
@responses.activate
def test_oidc_clock_skew_passed_to_refresh_token(self, time_mock, utc_time_sans_frac_mock):
"""Test that OIDC_CLOCK_SKEW allows validating tokens issued slightly in the future during refresh."""
# freeze time
timestamp = time.mktime(datetime(2017, 1, 1).timetuple())
time_mock.return_value = timestamp
utc_time_sans_frac_mock.return_value = int(timestamp)

token_endpoint = self.PROVIDER_BASEURL + '/token'

# Create an ID token that was issued 5 seconds in the future
# This should fail without clock skew, but succeed with OIDC_CLOCK_SKEW=10
skew_seconds = 5
exp_time = 3600
id_token_claims = {
'iss': self.PROVIDER_BASEURL,
'aud': [self.CLIENT_ID],
'sub': 'user1',
'exp': int(timestamp) + exp_time + skew_seconds,
'iat': int(timestamp) + skew_seconds, # issued 5 seconds in the future
}

id_token_jwt, id_token_signing_key = signed_id_token(id_token_claims)

# Mock token refresh response with the future-dated token
token_response = {
'access_token': 'new-access-token',
'expires_in': exp_time,
'token_type': 'Bearer',
'refresh_token': 'new-refresh-token',
'id_token': id_token_jwt
}
responses.add(responses.POST, token_endpoint, json=token_response)
responses.add(responses.GET,
self.PROVIDER_BASEURL + '/jwks',
json={'keys': [id_token_signing_key.serialize()]})

authn = self.init_app(provider_metadata_extras={'token_endpoint': token_endpoint})

with self.app.test_request_context('/foo'):
session = UserSession(flask.session, self.PROVIDER_NAME)
session.update(expires_in=-10, refresh_token='old-refresh-token')

# This should succeed because OIDC_CLOCK_SKEW=10 is configured (in create_flask_app)
# and the token is only 5 seconds in the future
result = authn.valid_access_token()

assert result == 'new-access-token'
assert session.access_token == 'new-access-token'
assert session.refresh_token == 'new-refresh-token'
assert session.id_token == id_token_claims

def test_should_not_refresh_without_refresh_token(self):
authn = self.init_app()

Expand Down