|
6 | 6 | from flask_openid import OpenID |
7 | 7 |
|
8 | 8 | from webapp.helper import get_or_create_user_id, get_user_from_directory_by_key |
9 | | -from webapp.models import User |
| 9 | +from webapp.models import User, db |
10 | 10 |
|
11 | 11 | SSO_LOGIN_URL = "https://login.ubuntu.com" |
12 | 12 | # private teams like canonical are not returned in response atm |
13 | 13 | # so temporarily need to add multiple subset public teams |
| 14 | + |
| 15 | +SSO_ADMIN_TEAM = "content-system-admins" |
14 | 16 | SSO_TEAM = ( |
15 | 17 | "canonical", |
16 | 18 | "canonical-content-people", |
17 | 19 | "pga-admins", |
18 | 20 | "canonical-webmonkeys", |
| 21 | + SSO_ADMIN_TEAM, |
19 | 22 | ) |
20 | 23 | DISABLE_SSO = os.environ.get("DISABLE_SSO") or os.environ.get( |
21 | 24 | "FLASK_DISABLE_SSO" |
@@ -45,20 +48,32 @@ def after_login(resp): |
45 | 48 | if not (set(SSO_TEAM) & set(resp.extensions["lp"].is_member)): |
46 | 49 | flask.abort(403) |
47 | 50 |
|
| 51 | + # check if user is admin |
| 52 | + role = ( |
| 53 | + "admin" |
| 54 | + if SSO_ADMIN_TEAM in resp.extensions["lp"].is_member |
| 55 | + else "user" |
| 56 | + ) |
| 57 | + |
48 | 58 | # find the user in database |
49 | 59 | user = User.query.filter_by(email=resp.email).first() |
| 60 | + if user and user.role != role: |
| 61 | + user.role = role |
| 62 | + db.session.commit() |
50 | 63 | if not user: |
51 | 64 | # fetch user record from directory |
52 | 65 | response = get_user_from_directory_by_key("email", resp.email) |
53 | 66 |
|
54 | 67 | if response.status_code == 200: |
55 | 68 | user = response.json().get("data", {}).get("employees", [])[0] |
| 69 | + user["role"] = role |
56 | 70 | # save user in users table |
57 | 71 | get_or_create_user_id(user) |
58 | 72 |
|
59 | 73 | flask.session["openid"] = { |
60 | 74 | "identity_url": resp.identity_url, |
61 | 75 | "email": resp.email, |
| 76 | + "role": role, |
62 | 77 | } |
63 | 78 |
|
64 | 79 | return flask.redirect(open_id.get_next_url()) |
@@ -101,3 +116,26 @@ def is_user_logged_in(*args, **kwargs): |
101 | 116 | return flask.redirect("/login_page?next=" + flask.request.path) |
102 | 117 |
|
103 | 118 | return is_user_logged_in |
| 119 | + |
| 120 | + |
| 121 | +def is_admin(func): |
| 122 | + """ |
| 123 | + Decorator that checks if a user is an admin user |
| 124 | + """ |
| 125 | + |
| 126 | + @functools.wraps(func) |
| 127 | + def is_admin_user(*args, **kwargs): |
| 128 | + if ( |
| 129 | + "openid" in flask.session |
| 130 | + and flask.session.get("openid")["role"] == "admin" |
| 131 | + ): |
| 132 | + return func(*args, **kwargs) |
| 133 | + |
| 134 | + return ( |
| 135 | + flask.jsonify( |
| 136 | + {"error": "This operation requires admin privileges"} |
| 137 | + ), |
| 138 | + 403, |
| 139 | + ) |
| 140 | + |
| 141 | + return is_admin_user |
0 commit comments