-
-
Notifications
You must be signed in to change notification settings - Fork 802
/
Copy pathmodels.py
834 lines (668 loc) · 28.6 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
import hashlib
import logging
import time
import uuid
from datetime import timedelta
from urllib.parse import parse_qsl, urlparse
from django.apps import apps
from django.conf import settings
from django.contrib.auth.hashers import identify_hasher, make_password
from django.core.exceptions import ImproperlyConfigured
from django.db import models, transaction
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from jwcrypto import jwk
from jwcrypto.common import base64url_encode
from oauthlib.oauth2.rfc6749 import errors
from .generators import generate_client_id, generate_client_secret
from .scopes import get_scopes_backend
from .settings import oauth2_settings
from .utils import jwk_from_pem
from .validators import AllowedURIValidator
logger = logging.getLogger(__name__)
class ClientSecretField(models.CharField):
def pre_save(self, model_instance, add):
secret = getattr(model_instance, self.attname)
should_be_hashed = getattr(model_instance, "hash_client_secret", True)
if not should_be_hashed:
return super().pre_save(model_instance, add)
try:
hasher = identify_hasher(secret)
logger.debug(f"{model_instance}: {self.attname} is already hashed with {hasher}.")
except ValueError:
logger.debug(f"{model_instance}: {self.attname} is not hashed; hashing it now.")
hashed_secret = make_password(secret)
setattr(model_instance, self.attname, hashed_secret)
return hashed_secret
return super().pre_save(model_instance, add)
class TokenChecksumField(models.CharField):
def pre_save(self, model_instance, add):
token = getattr(model_instance, "token")
checksum = hashlib.sha256(token.encode("utf-8")).hexdigest()
setattr(model_instance, self.attname, checksum)
return super().pre_save(model_instance, add)
class AbstractApplication(models.Model):
"""
An Application instance represents a Client on the Authorization server.
Usually an Application is created manually by client's developers after
logging in on an Authorization Server.
Fields:
* :attr:`client_id` The client identifier issued to the client during the
registration process as described in :rfc:`2.2`
* :attr:`user` ref to a Django user
* :attr:`redirect_uris` The list of allowed redirect uri. The string
consists of valid URLs separated by space
* :attr:`post_logout_redirect_uris` The list of allowed redirect uris after
an RP initiated logout. The string
consists of valid URLs separated by space
* :attr:`client_type` Client type as described in :rfc:`2.1`
* :attr:`authorization_grant_type` Authorization flows available to the
Application
* :attr:`client_secret` Confidential secret issued to the client during
the registration process as described in :rfc:`2.2`
* :attr:`name` Friendly name for the Application
"""
CLIENT_CONFIDENTIAL = "confidential"
CLIENT_PUBLIC = "public"
CLIENT_TYPES = (
(CLIENT_CONFIDENTIAL, _("Confidential")),
(CLIENT_PUBLIC, _("Public")),
)
GRANT_AUTHORIZATION_CODE = "authorization-code"
GRANT_IMPLICIT = "implicit"
GRANT_PASSWORD = "password"
GRANT_CLIENT_CREDENTIALS = "client-credentials"
GRANT_OPENID_HYBRID = "openid-hybrid"
GRANT_TYPES = (
(GRANT_AUTHORIZATION_CODE, _("Authorization code")),
(GRANT_IMPLICIT, _("Implicit")),
(GRANT_PASSWORD, _("Resource owner password-based")),
(GRANT_CLIENT_CREDENTIALS, _("Client credentials")),
(GRANT_OPENID_HYBRID, _("OpenID connect hybrid")),
)
NO_ALGORITHM = ""
RS256_ALGORITHM = "RS256"
HS256_ALGORITHM = "HS256"
ALGORITHM_TYPES = (
(NO_ALGORITHM, _("No OIDC support")),
(RS256_ALGORITHM, _("RSA with SHA-2 256")),
(HS256_ALGORITHM, _("HMAC with SHA-2 256")),
)
id = models.BigAutoField(primary_key=True)
client_id = models.CharField(max_length=100, unique=True, default=generate_client_id, db_index=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="%(app_label)s_%(class)s",
null=True,
blank=True,
on_delete=models.CASCADE,
)
redirect_uris = models.TextField(
blank=True,
help_text=_("Allowed URIs list, space separated"),
)
post_logout_redirect_uris = models.TextField(
blank=True,
help_text=_("Allowed Post Logout URIs list, space separated"),
default="",
)
client_type = models.CharField(max_length=32, choices=CLIENT_TYPES)
authorization_grant_type = models.CharField(max_length=32, choices=GRANT_TYPES)
client_secret = ClientSecretField(
max_length=255,
blank=True,
default=generate_client_secret,
db_index=True,
help_text=_("Hashed on Save. Copy it now if this is a new secret."),
)
hash_client_secret = models.BooleanField(default=True)
name = models.CharField(max_length=255, blank=True)
skip_authorization = models.BooleanField(default=False)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
algorithm = models.CharField(max_length=5, choices=ALGORITHM_TYPES, default=NO_ALGORITHM, blank=True)
allowed_origins = models.TextField(
blank=True,
help_text=_("Allowed origins list to enable CORS, space separated"),
default="",
)
class Meta:
abstract = True
def __str__(self):
return self.name or self.client_id
@property
def default_redirect_uri(self):
"""
Returns the default redirect_uri, *if* only one is registered.
"""
if self.redirect_uris:
uris = self.redirect_uris.split()
if len(uris) == 1:
return self.redirect_uris.split().pop(0)
raise errors.MissingRedirectURIError()
assert False, (
"If you are using implicit, authorization_code "
"or all-in-one grant_type, you must define "
"redirect_uris field in your Application model"
)
def redirect_uri_allowed(self, uri):
"""
Checks if given url is one of the items in :attr:`redirect_uris` string
:param uri: Url to check
"""
return redirect_to_uri_allowed(uri, self.redirect_uris.split())
def post_logout_redirect_uri_allowed(self, uri):
"""
Checks if given URI is one of the items in :attr:`post_logout_redirect_uris` string
:param uri: URI to check
"""
return redirect_to_uri_allowed(uri, self.post_logout_redirect_uris.split())
def origin_allowed(self, origin):
"""
Checks if given origin is one of the items in :attr:`allowed_origins` string
:param origin: Origin to check
"""
return self.allowed_origins and is_origin_allowed(origin, self.allowed_origins.split())
def clean(self):
from django.core.exceptions import ValidationError
grant_types = (
AbstractApplication.GRANT_AUTHORIZATION_CODE,
AbstractApplication.GRANT_IMPLICIT,
AbstractApplication.GRANT_OPENID_HYBRID,
)
hs_forbidden_grant_types = (
AbstractApplication.GRANT_IMPLICIT,
AbstractApplication.GRANT_OPENID_HYBRID,
)
redirect_uris = self.redirect_uris.strip().split()
allowed_schemes = set(s.lower() for s in self.get_allowed_schemes())
if redirect_uris:
validator = AllowedURIValidator(
allowed_schemes, name="redirect uri", allow_path=True, allow_query=True
)
for uri in redirect_uris:
validator(uri)
elif self.authorization_grant_type in grant_types:
raise ValidationError(
_("redirect_uris cannot be empty with grant_type {grant_type}").format(
grant_type=self.authorization_grant_type
)
)
allowed_origins = self.allowed_origins.strip().split()
if allowed_origins:
# oauthlib allows only https scheme for CORS
validator = AllowedURIValidator(oauth2_settings.ALLOWED_SCHEMES, "allowed origin")
for uri in allowed_origins:
validator(uri)
if self.algorithm == AbstractApplication.RS256_ALGORITHM:
if not oauth2_settings.OIDC_RSA_PRIVATE_KEY:
raise ValidationError(_("You must set OIDC_RSA_PRIVATE_KEY to use RSA algorithm"))
if self.algorithm == AbstractApplication.HS256_ALGORITHM:
if any(
(
self.authorization_grant_type in hs_forbidden_grant_types,
self.client_type == Application.CLIENT_PUBLIC,
)
):
raise ValidationError(_("You cannot use HS256 with public grants or clients"))
# TODO: I removed usage of this in templates. so it can label as deprecated.`
def get_absolute_url(self):
return reverse("oauth2_provider:detail", args=[str(self.pk)])
def get_allowed_schemes(self):
"""
Returns the list of redirect schemes allowed by the Application.
By default, returns `ALLOWED_REDIRECT_URI_SCHEMES`.
"""
return oauth2_settings.ALLOWED_REDIRECT_URI_SCHEMES
def allows_grant_type(self, *grant_types):
return self.authorization_grant_type in grant_types
def is_usable(self, request):
"""
Determines whether the application can be used.
:param request: The oauthlib.common.Request being processed.
"""
return True
@property
def jwk_key(self):
if self.algorithm == AbstractApplication.RS256_ALGORITHM:
if not oauth2_settings.OIDC_RSA_PRIVATE_KEY:
raise ImproperlyConfigured("You must set OIDC_RSA_PRIVATE_KEY to use RSA algorithm")
return jwk_from_pem(oauth2_settings.OIDC_RSA_PRIVATE_KEY)
elif self.algorithm == AbstractApplication.HS256_ALGORITHM:
return jwk.JWK(kty="oct", k=base64url_encode(self.client_secret))
raise ImproperlyConfigured("This application does not support signed tokens")
class ApplicationManager(models.Manager):
def get_by_natural_key(self, client_id):
return self.get(client_id=client_id)
class Application(AbstractApplication):
objects = ApplicationManager()
class Meta(AbstractApplication.Meta):
swappable = "OAUTH2_PROVIDER_APPLICATION_MODEL"
def natural_key(self):
return (self.client_id,)
class AbstractGrant(models.Model):
"""
A Grant instance represents a token with a short lifetime that can
be swapped for an access token, as described in :rfc:`4.1.2`
Fields:
* :attr:`user` The Django user who requested the grant
* :attr:`code` The authorization code generated by the authorization server
* :attr:`application` Application instance this grant was asked for
* :attr:`expires` Expire time in seconds, defaults to
:data:`settings.AUTHORIZATION_CODE_EXPIRE_SECONDS`
* :attr:`redirect_uri` Self explained
* :attr:`scope` Required scopes, optional
* :attr:`code_challenge` PKCE code challenge
* :attr:`code_challenge_method` PKCE code challenge transform algorithm
"""
CODE_CHALLENGE_PLAIN = "plain"
CODE_CHALLENGE_S256 = "S256"
CODE_CHALLENGE_METHODS = ((CODE_CHALLENGE_PLAIN, "plain"), (CODE_CHALLENGE_S256, "S256"))
id = models.BigAutoField(primary_key=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="%(app_label)s_%(class)s"
)
code = models.CharField(max_length=255, unique=True) # code comes from oauthlib
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL, on_delete=models.CASCADE)
expires = models.DateTimeField()
redirect_uri = models.TextField()
scope = models.TextField(blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
code_challenge = models.CharField(max_length=128, blank=True, default="")
code_challenge_method = models.CharField(
max_length=10, blank=True, default="", choices=CODE_CHALLENGE_METHODS
)
nonce = models.CharField(max_length=255, blank=True, default="")
claims = models.TextField(blank=True)
def is_expired(self):
"""
Check token expiration with timezone awareness
"""
if not self.expires:
return True
return timezone.now() >= self.expires
def redirect_uri_allowed(self, uri):
return uri == self.redirect_uri
def __str__(self):
return self.code
class Meta:
abstract = True
class Grant(AbstractGrant):
class Meta(AbstractGrant.Meta):
swappable = "OAUTH2_PROVIDER_GRANT_MODEL"
class AbstractAccessToken(models.Model):
"""
An AccessToken instance represents the actual access token to
access user's resources, as in :rfc:`5`.
Fields:
* :attr:`user` The Django user representing resources" owner
* :attr:`source_refresh_token` If from a refresh, the consumed RefeshToken
* :attr:`token` Access token
* :attr:`application` Application instance
* :attr:`expires` Date and time of token expiration, in DateTime format
* :attr:`scope` Allowed scopes
"""
id = models.BigAutoField(primary_key=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="%(app_label)s_%(class)s",
)
source_refresh_token = models.OneToOneField(
# unique=True implied by the OneToOneField
oauth2_settings.REFRESH_TOKEN_MODEL,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="refreshed_access_token",
)
token = models.TextField()
token_checksum = TokenChecksumField(
max_length=64,
blank=True,
unique=True,
db_index=True,
)
id_token = models.OneToOneField(
oauth2_settings.ID_TOKEN_MODEL,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="access_token",
)
application = models.ForeignKey(
oauth2_settings.APPLICATION_MODEL,
on_delete=models.CASCADE,
blank=True,
null=True,
)
expires = models.DateTimeField()
scope = models.TextField(blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
def is_valid(self, scopes=None):
"""
Checks if the access token is valid.
:param scopes: An iterable containing the scopes to check or None
"""
return not self.is_expired() and self.allow_scopes(scopes)
def is_expired(self):
"""
Check token expiration with timezone awareness
"""
if not self.expires:
return True
return timezone.now() >= self.expires
def allow_scopes(self, scopes):
"""
Check if the token allows the provided scopes
:param scopes: An iterable containing the scopes to check
"""
if not scopes:
return True
provided_scopes = set(self.scope.split())
resource_scopes = set(scopes)
return resource_scopes.issubset(provided_scopes)
def revoke(self):
"""
Convenience method to uniform tokens" interface, for now
simply remove this token from the database in order to revoke it.
"""
self.delete()
@property
def scopes(self):
"""
Returns a dictionary of allowed scope names (as keys) with their descriptions (as values)
"""
all_scopes = get_scopes_backend().get_all_scopes()
token_scopes = self.scope.split()
return {name: desc for name, desc in all_scopes.items() if name in token_scopes}
def __str__(self):
return self.token
class Meta:
abstract = True
class AccessToken(AbstractAccessToken):
class Meta(AbstractAccessToken.Meta):
swappable = "OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL"
class AbstractRefreshToken(models.Model):
"""
A RefreshToken instance represents a token that can be swapped for a new
access token when it expires.
Fields:
* :attr:`user` The Django user representing resources" owner
* :attr:`token` Token value
* :attr:`application` Application instance
* :attr:`access_token` AccessToken instance this refresh token is
bounded to
* :attr:`revoked` Timestamp of when this refresh token was revoked
"""
id = models.BigAutoField(primary_key=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="%(app_label)s_%(class)s"
)
token = models.CharField(max_length=255)
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL, on_delete=models.CASCADE)
access_token = models.OneToOneField(
oauth2_settings.ACCESS_TOKEN_MODEL,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="refresh_token",
)
token_family = models.UUIDField(null=True, blank=True, editable=False)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
revoked = models.DateTimeField(null=True)
def revoke(self):
"""
Mark this refresh token revoked and revoke related access token
"""
access_token_model = get_access_token_model()
refresh_token_model = get_refresh_token_model()
with transaction.atomic():
token = refresh_token_model.objects.select_for_update().filter(pk=self.pk, revoked__isnull=True)
if not token:
return
self = list(token)[0]
try:
access_token_model.objects.get(pk=self.access_token_id).revoke()
except access_token_model.DoesNotExist:
pass
self.access_token = None
self.revoked = timezone.now()
self.save()
def __str__(self):
return self.token
class Meta:
abstract = True
unique_together = (
"token",
"revoked",
)
class RefreshToken(AbstractRefreshToken):
class Meta(AbstractRefreshToken.Meta):
swappable = "OAUTH2_PROVIDER_REFRESH_TOKEN_MODEL"
class AbstractIDToken(models.Model):
"""
An IDToken instance represents the actual token to
access user's resources, as in :openid:`2`.
Fields:
* :attr:`user` The Django user representing resources' owner
* :attr:`jti` ID token JWT Token ID, to identify an individual token
* :attr:`application` Application instance
* :attr:`expires` Date and time of token expiration, in DateTime format
* :attr:`scope` Allowed scopes
* :attr:`created` Date and time of token creation, in DateTime format
* :attr:`updated` Date and time of token update, in DateTime format
"""
id = models.BigAutoField(primary_key=True)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="%(app_label)s_%(class)s",
)
jti = models.UUIDField(unique=True, default=uuid.uuid4, editable=False, verbose_name="JWT Token ID")
application = models.ForeignKey(
oauth2_settings.APPLICATION_MODEL,
on_delete=models.CASCADE,
blank=True,
null=True,
)
expires = models.DateTimeField()
scope = models.TextField(blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
def is_valid(self, scopes=None):
"""
Checks if the access token is valid.
:param scopes: An iterable containing the scopes to check or None
"""
return not self.is_expired() and self.allow_scopes(scopes)
def is_expired(self):
"""
Check token expiration with timezone awareness
"""
if not self.expires:
return True
return timezone.now() >= self.expires
def allow_scopes(self, scopes):
"""
Check if the token allows the provided scopes
:param scopes: An iterable containing the scopes to check
"""
if not scopes:
return True
provided_scopes = set(self.scope.split())
resource_scopes = set(scopes)
return resource_scopes.issubset(provided_scopes)
def revoke(self):
"""
Convenience method to uniform tokens' interface, for now
simply remove this token from the database in order to revoke it.
"""
self.delete()
@property
def scopes(self):
"""
Returns a dictionary of allowed scope names (as keys) with their descriptions (as values)
"""
all_scopes = get_scopes_backend().get_all_scopes()
token_scopes = self.scope.split()
return {name: desc for name, desc in all_scopes.items() if name in token_scopes}
def __str__(self):
return "JTI: {self.jti} User: {self.user_id}".format(self=self)
class Meta:
abstract = True
class IDToken(AbstractIDToken):
class Meta(AbstractIDToken.Meta):
swappable = "OAUTH2_PROVIDER_ID_TOKEN_MODEL"
def get_application_model():
"""Return the Application model that is active in this project."""
return apps.get_model(oauth2_settings.APPLICATION_MODEL)
def get_grant_model():
"""Return the Grant model that is active in this project."""
return apps.get_model(oauth2_settings.GRANT_MODEL)
def get_access_token_model():
"""Return the AccessToken model that is active in this project."""
return apps.get_model(oauth2_settings.ACCESS_TOKEN_MODEL)
def get_id_token_model():
"""Return the AccessToken model that is active in this project."""
return apps.get_model(oauth2_settings.ID_TOKEN_MODEL)
def get_refresh_token_model():
"""Return the RefreshToken model that is active in this project."""
return apps.get_model(oauth2_settings.REFRESH_TOKEN_MODEL)
def get_application_admin_class():
"""Return the Application admin class that is active in this project."""
application_admin_class = oauth2_settings.APPLICATION_ADMIN_CLASS
return application_admin_class
def get_access_token_admin_class():
"""Return the AccessToken admin class that is active in this project."""
access_token_admin_class = oauth2_settings.ACCESS_TOKEN_ADMIN_CLASS
return access_token_admin_class
def get_grant_admin_class():
"""Return the Grant admin class that is active in this project."""
grant_admin_class = oauth2_settings.GRANT_ADMIN_CLASS
return grant_admin_class
def get_id_token_admin_class():
"""Return the IDToken admin class that is active in this project."""
id_token_admin_class = oauth2_settings.ID_TOKEN_ADMIN_CLASS
return id_token_admin_class
def get_refresh_token_admin_class():
"""Return the RefreshToken admin class that is active in this project."""
refresh_token_admin_class = oauth2_settings.REFRESH_TOKEN_ADMIN_CLASS
return refresh_token_admin_class
def clear_expired():
def batch_delete(queryset, query):
CLEAR_EXPIRED_TOKENS_BATCH_SIZE = oauth2_settings.CLEAR_EXPIRED_TOKENS_BATCH_SIZE
CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL = oauth2_settings.CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL
current_no = start_no = queryset.count()
while current_no:
flat_queryset = queryset.values_list("id", flat=True)[:CLEAR_EXPIRED_TOKENS_BATCH_SIZE]
batch_length = flat_queryset.count()
queryset.model.objects.filter(id__in=list(flat_queryset)).delete()
logger.debug(f"{batch_length} tokens deleted, {current_no-batch_length} left")
queryset = queryset.model.objects.filter(query)
time.sleep(CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL)
current_no = queryset.count()
stop_no = queryset.model.objects.filter(query).count()
deleted = start_no - stop_no
return deleted
now = timezone.now()
refresh_expire_at = None
access_token_model = get_access_token_model()
refresh_token_model = get_refresh_token_model()
id_token_model = get_id_token_model()
grant_model = get_grant_model()
REFRESH_TOKEN_EXPIRE_SECONDS = oauth2_settings.REFRESH_TOKEN_EXPIRE_SECONDS
if REFRESH_TOKEN_EXPIRE_SECONDS:
if not isinstance(REFRESH_TOKEN_EXPIRE_SECONDS, timedelta):
try:
REFRESH_TOKEN_EXPIRE_SECONDS = timedelta(seconds=REFRESH_TOKEN_EXPIRE_SECONDS)
except TypeError:
e = "REFRESH_TOKEN_EXPIRE_SECONDS must be either a timedelta or seconds"
raise ImproperlyConfigured(e)
refresh_expire_at = now - REFRESH_TOKEN_EXPIRE_SECONDS
if refresh_expire_at:
revoked_query = models.Q(revoked__lt=refresh_expire_at)
revoked = refresh_token_model.objects.filter(revoked_query)
revoked_deleted_no = batch_delete(revoked, revoked_query)
logger.info("%s Revoked refresh tokens deleted", revoked_deleted_no)
expired_query = models.Q(access_token__expires__lt=refresh_expire_at)
expired = refresh_token_model.objects.filter(expired_query)
expired_deleted_no = batch_delete(expired, expired_query)
logger.info("%s Expired refresh tokens deleted", expired_deleted_no)
else:
logger.info("refresh_expire_at is %s. No refresh tokens deleted.", refresh_expire_at)
access_token_query = models.Q(refresh_token__isnull=True, expires__lt=now)
access_tokens = access_token_model.objects.filter(access_token_query)
access_tokens_delete_no = batch_delete(access_tokens, access_token_query)
logger.info("%s Expired access tokens deleted", access_tokens_delete_no)
id_token_query = models.Q(access_token__isnull=True, expires__lt=now)
id_tokens = id_token_model.objects.filter(id_token_query)
id_tokens_delete_no = batch_delete(id_tokens, id_token_query)
logger.info("%s Expired ID tokens deleted", id_tokens_delete_no)
grants_query = models.Q(expires__lt=now)
grants = grant_model.objects.filter(grants_query)
grants_deleted_no = batch_delete(grants, grants_query)
logger.info("%s Expired grant tokens deleted", grants_deleted_no)
def redirect_to_uri_allowed(uri, allowed_uris):
"""
Checks if a given uri can be redirected to based on the provided allowed_uris configuration.
On top of exact matches, this function also handles loopback IPs based on RFC 8252.
:param uri: URI to check
:param allowed_uris: A list of URIs that are allowed
"""
parsed_uri = urlparse(uri)
uqs_set = set(parse_qsl(parsed_uri.query))
for allowed_uri in allowed_uris:
parsed_allowed_uri = urlparse(allowed_uri)
# From RFC 8252 (Section 7.3)
#
# Loopback redirect URIs use the "http" scheme
# [...]
# The authorization server MUST allow any port to be specified at the
# time of the request for loopback IP redirect URIs, to accommodate
# clients that obtain an available ephemeral port from the operating
# system at the time of the request.
allowed_uri_is_loopback = (
parsed_allowed_uri.scheme == "http"
and parsed_allowed_uri.hostname in ["127.0.0.1", "::1"]
and parsed_allowed_uri.port is None
)
if (
allowed_uri_is_loopback
and parsed_allowed_uri.scheme == parsed_uri.scheme
and parsed_allowed_uri.hostname == parsed_uri.hostname
and parsed_allowed_uri.path == parsed_uri.path
) or (
parsed_allowed_uri.scheme == parsed_uri.scheme
and parsed_allowed_uri.netloc == parsed_uri.netloc
and parsed_allowed_uri.path == parsed_uri.path
):
aqs_set = set(parse_qsl(parsed_allowed_uri.query))
if aqs_set.issubset(uqs_set):
return True
return False
def is_origin_allowed(origin, allowed_origins):
"""
Checks if a given origin uri is allowed based on the provided allowed_origins configuration.
:param origin: Origin URI to check
:param allowed_origins: A list of Origin URIs that are allowed
"""
parsed_origin = urlparse(origin)
if parsed_origin.scheme not in oauth2_settings.ALLOWED_SCHEMES:
return False
for allowed_origin in allowed_origins:
parsed_allowed_origin = urlparse(allowed_origin)
if (
parsed_allowed_origin.scheme == parsed_origin.scheme
and parsed_allowed_origin.netloc == parsed_origin.netloc
):
return True
return False