|
| 1 | +from ..config import config |
| 2 | +from ..models import Class |
| 3 | +from .base import AuthzClient |
| 4 | + |
| 5 | +import logging |
| 6 | + |
| 7 | +import tqdm |
| 8 | +from sqlalchemy import select |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | + |
| 13 | +async def swap_class_admin_for_teacher(c: AuthzClient, all_classes: list[int]): |
| 14 | + """Find users who are class admins and swap them to teachers.""" |
| 15 | + logger.info("Swapping group admin role for teacher role ...") |
| 16 | + total_migrated = 0 |
| 17 | + for class_id in tqdm.tqdm(all_classes): |
| 18 | + # Use `expand` instead of `list-users` since we want to *ignore* everyone |
| 19 | + # who has *inherited* permissions. Expand at level 1 will only return |
| 20 | + # the direct assignments within the class. |
| 21 | + expand_result = await c.expand(f"class:{class_id}", "admin", max_depth=1) |
| 22 | + users_with_admin = [rel.entity for rel in expand_result if not rel.is_group] |
| 23 | + await c.write_safe( |
| 24 | + grant=[(u, "teacher", f"class:{class_id}") for u in users_with_admin], |
| 25 | + revoke=[(u, "admin", f"class:{class_id}") for u in users_with_admin], |
| 26 | + ) |
| 27 | + total_migrated += len(users_with_admin) |
| 28 | + logger.info(f"Swapped {total_migrated} user(s) from admin to teacher role.") |
| 29 | + |
| 30 | + |
| 31 | +async def swap_class_admin_group_for_supervisor_group( |
| 32 | + c: AuthzClient, all_classes: list[int] |
| 33 | +): |
| 34 | + """Find groups where admins can manage assistants and grant to supervisors.""" |
| 35 | + logger.info("Swapping group admin group for supervisor group ...") |
| 36 | + total_migrated = 0 |
| 37 | + for class_id in tqdm.tqdm(all_classes): |
| 38 | + # NOTE that since supervisor is a supserset of admin, this will always be true even |
| 39 | + # after classes are migrated. That's ok, since the update is idempotent. |
| 40 | + checks = await c.check( |
| 41 | + [(f"class:{class_id}#admin", "can_manage_assistants", f"class:{class_id}")] |
| 42 | + ) |
| 43 | + if checks[0]: |
| 44 | + await c.write_safe( |
| 45 | + grant=[ |
| 46 | + ( |
| 47 | + f"class:{class_id}#supervisor", |
| 48 | + "can_manage_assistants", |
| 49 | + f"class:{class_id}", |
| 50 | + ) |
| 51 | + ], |
| 52 | + revoke=[ |
| 53 | + ( |
| 54 | + f"class:{class_id}#admin", |
| 55 | + "can_manage_assistants", |
| 56 | + f"class:{class_id}", |
| 57 | + ) |
| 58 | + ], |
| 59 | + ) |
| 60 | + total_migrated += 1 |
| 61 | + logger.info(f"Swapped {total_migrated} group(s) from admin to supervisor group.") |
| 62 | + |
| 63 | + |
| 64 | +async def remove_class_admin_perms(): |
| 65 | + """Clean up any `admin` permissions assigned on a class level. |
| 66 | +
|
| 67 | + 1) Swap class `admin` for `teacher` |
| 68 | + 2) Swap `class:#admin` group for `class:#supervisor` where it is set in `can_manage_assistants` |
| 69 | + """ |
| 70 | + logger.info("Updating group admin permissions ...") |
| 71 | + await config.authz.driver.init() |
| 72 | + async with config.db.driver.async_session() as session: |
| 73 | + all_classes = await session.execute(select(Class)) |
| 74 | + all_class_ids = [c.id for c in all_classes.scalars()] |
| 75 | + |
| 76 | + logger.info(f"Found {len(all_class_ids)} groups to migrate.") |
| 77 | + |
| 78 | + async with config.authz.driver.get_client() as c: |
| 79 | + await swap_class_admin_for_teacher(c, all_class_ids) |
| 80 | + await swap_class_admin_group_for_supervisor_group(c, all_class_ids) |
| 81 | + |
| 82 | + logger.info("Group admin permissions updated.") |
0 commit comments