-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create speaker account with SSO as part of the answer to Call for Proposals #508
Changes from 5 commits
c1506e2
4863ede
5b18051
f989910
ccb0034
0994fbb
d3850ae
a3bf18f
bceb78e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,16 @@ | ||
import logging | ||
from enum import StrEnum | ||
from urllib.parse import urlencode, urljoin, urlparse, urlunparse | ||
from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlunparse | ||
|
||
from allauth.socialaccount.adapter import get_adapter | ||
from allauth.socialaccount.models import SocialApp | ||
from django.conf import settings | ||
from django.contrib import messages | ||
from django.http import HttpRequest, HttpResponse, QueryDict | ||
from django.shortcuts import redirect | ||
from django.urls import reverse | ||
from django.views.generic import TemplateView | ||
from django.utils.translation import gettext_lazy as _ | ||
from django.views.generic import TemplateView, View | ||
from pydantic import ValidationError | ||
|
||
from pretix.base.models import User | ||
|
@@ -23,38 +25,108 @@ | |
adapter = get_adapter() | ||
|
||
|
||
def oauth_login(request, provider): | ||
gs = GlobalSettingsObject() | ||
client_id = gs.settings.get('login_providers', as_type=dict).get(provider, {}).get('client_id') | ||
provider = adapter.get_provider(request, provider, client_id=client_id) | ||
class OAuthLoginView(View): | ||
def get(self, request: HttpRequest, provider: str) -> HttpResponse: | ||
self.set_oauth2_params(request) | ||
|
||
base_url = provider.get_login_url(request) | ||
query_params = { | ||
"next": build_absolute_uri("plugins:socialauth:social.oauth.return") | ||
} | ||
parsed_url = urlparse(base_url) | ||
updated_url = parsed_url._replace(query=urlencode(query_params)) | ||
return redirect(urlunparse(updated_url)) | ||
gs = GlobalSettingsObject() | ||
client_id = ( | ||
gs.settings.get("login_providers", as_type=dict) | ||
.get(provider, {}) | ||
.get("client_id") | ||
) | ||
provider_instance = adapter.get_provider(request, provider, client_id=client_id) | ||
|
||
base_url = provider_instance.get_login_url(request) | ||
query_params = { | ||
"next": build_absolute_uri("plugins:socialauth:social.oauth.return") | ||
} | ||
parsed_url = urlparse(base_url) | ||
updated_url = parsed_url._replace(query=urlencode(query_params)) | ||
return redirect(urlunparse(updated_url)) | ||
|
||
def set_oauth2_params(self, request: HttpRequest) -> None: | ||
""" | ||
Handle Login with SSO button from other components | ||
This function will set 'oauth2_params' in session for oauth2_callback | ||
""" | ||
next_url = request.GET.get("next", "") | ||
if not next_url: | ||
return | ||
|
||
parsed = urlparse(next_url) | ||
|
||
# Only allow relative URLs | ||
if parsed.netloc or parsed.scheme: | ||
return | ||
|
||
allowed_params = { | ||
"client_id", | ||
"redirect_uri", | ||
"state", | ||
"scope", | ||
"response_type" | ||
} | ||
params = parse_qs(parsed.query) | ||
sanitized_params = { | ||
k: v[0].strip() | ||
for k, v in params.items() if k in allowed_params | ||
} | ||
|
||
request.session["oauth2_params"] = sanitized_params | ||
|
||
|
||
class OAuthReturnView(View): | ||
def get(self, request: HttpRequest) -> HttpResponse: | ||
try: | ||
user = self.get_or_create_user(request) | ||
response = process_login_and_set_cookie(request, user, False) | ||
oauth2_params = request.session.get("oauth2_params", {}) | ||
HungNgien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if oauth2_params: | ||
get_params = self.prepare_oauth2_params(oauth2_params) | ||
auth_url = reverse("control:oauth2_provider.authorize") | ||
|
||
# Clean up session after use | ||
del request.session["oauth2_params"] | ||
|
||
return redirect(f"{auth_url}?{get_params.urlencode()}") | ||
|
||
return response | ||
except AttributeError as e: | ||
messages.error( | ||
request, _("Error while authorizing: no email address available.") | ||
) | ||
logger.error("Error while authorizing: %s", e) | ||
return redirect("control:auth.login") | ||
|
||
def oauth_return(request): | ||
try: | ||
user, _ = User.objects.get_or_create( | ||
def get_or_create_user(self, request: HttpRequest) -> User: | ||
""" | ||
Get or create a user from social auth information. | ||
""" | ||
return User.objects.get_or_create( | ||
email=request.user.email, | ||
defaults={ | ||
'locale': getattr(request, 'LANGUAGE_CODE', settings.LANGUAGE_CODE), | ||
'timezone': getattr(request, 'timezone', settings.TIME_ZONE), | ||
'auth_backend': 'native', | ||
'password': '', | ||
"locale": getattr(request, "LANGUAGE_CODE", settings.LANGUAGE_CODE), | ||
"timezone": getattr(request, "timezone", settings.TIME_ZONE), | ||
"auth_backend": "native", | ||
"password": "", | ||
}, | ||
) | ||
return process_login_and_set_cookie(request, user, False) | ||
except AttributeError: | ||
messages.error( | ||
request, _('Error while authorizing: no email address available.') | ||
) | ||
logger.error('Error while authorizing: user has no email address.') | ||
return redirect('control:auth.login') | ||
)[0] | ||
|
||
def prepare_oauth2_params(self, oauth2_params: dict) -> QueryDict: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please simplify the implementation, because you don't need to create a Btw, this method doesn't use anything from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed this function to a @staticmethod since it’s only used within this class. |
||
""" | ||
Prepare OAuth2 parameters to be passed to the OAuth2 authorization view. | ||
""" | ||
get_params = QueryDict("", mutable=True) | ||
base_params = { | ||
"response_type": oauth2_params.get("response_type", "code"), | ||
"client_id": oauth2_params.get("client_id"), | ||
"redirect_uri": oauth2_params.get("redirect_uri"), | ||
"scope": oauth2_params.get("scope", "profile"), | ||
"state": oauth2_params.get("state"), | ||
} | ||
get_params.update(base_params) | ||
return get_params | ||
|
||
|
||
class SocialLoginView(AdministratorPermissionRequiredMixin, TemplateView): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this step if you have Pydantic model. You can tell Pydantic to automatically strip whitespace.
Learn more here.