|
| 1 | +"""migrate_session_data_to_rbac |
| 2 | +
|
| 3 | +Revision ID: 30c8308738ee |
| 4 | +Revises: 3f5c20f7bb07 |
| 5 | +Create Date: 2026-03-05 03:10:36.273207 |
| 6 | +
|
| 7 | +""" |
| 8 | + |
| 9 | +import sqlalchemy as sa |
| 10 | +from alembic import op |
| 11 | +from sqlalchemy.engine import Connection |
| 12 | + |
| 13 | +from ai.backend.manager.models.rbac_models.migration.enums import ( |
| 14 | + EntityType, |
| 15 | + OperationType, |
| 16 | +) |
| 17 | + |
| 18 | +# revision identifiers, used by Alembic. |
| 19 | +revision = "30c8308738ee" |
| 20 | +down_revision = "3f5c20f7bb07" |
| 21 | +branch_labels = None |
| 22 | +depends_on = None |
| 23 | + |
| 24 | +# Constants |
| 25 | +BATCH_SIZE = 1000 |
| 26 | +MEMBER_ROLE_POSTFIX = "member" |
| 27 | + |
| 28 | + |
| 29 | +def _add_entity_type_permissions(db_conn: Connection) -> None: |
| 30 | + """Add SESSION entity-type permissions to all role+scope combinations. |
| 31 | +
|
| 32 | + This operation queries existing role+scope combinations from the permissions table |
| 33 | + and adds SESSION operations based on the role name. |
| 34 | + """ |
| 35 | + offset = 0 |
| 36 | + |
| 37 | + while True: |
| 38 | + # Get distinct role+scope combinations with role names |
| 39 | + query = sa.text(""" |
| 40 | + SELECT DISTINCT p.role_id, r.name AS role_name, p.scope_type, p.scope_id |
| 41 | + FROM permissions p |
| 42 | + JOIN roles r ON p.role_id = r.id |
| 43 | + ORDER BY p.role_id, p.scope_type, p.scope_id |
| 44 | + OFFSET :offset |
| 45 | + LIMIT :limit |
| 46 | + """) |
| 47 | + rows = db_conn.execute(query, {"offset": offset, "limit": BATCH_SIZE}).all() |
| 48 | + if not rows: |
| 49 | + break |
| 50 | + |
| 51 | + offset += BATCH_SIZE |
| 52 | + |
| 53 | + # Prepare permissions to insert |
| 54 | + values_list = [] |
| 55 | + for row in rows: |
| 56 | + # Skip domain member roles (scope too broad for Session access) |
| 57 | + if row.scope_type == "domain" and row.role_name.endswith(MEMBER_ROLE_POSTFIX): |
| 58 | + continue |
| 59 | + |
| 60 | + # Determine operations based on role type |
| 61 | + if row.role_name.endswith(MEMBER_ROLE_POSTFIX): |
| 62 | + operations = OperationType.member_operations() |
| 63 | + else: |
| 64 | + operations = OperationType.owner_operations() |
| 65 | + |
| 66 | + # Add all operations for this role+scope |
| 67 | + for operation in operations: |
| 68 | + values_list.append( |
| 69 | + f"('{row.role_id}', '{row.scope_type}', '{row.scope_id}', " |
| 70 | + f"'{EntityType.SESSION.value}', '{operation.value}')" |
| 71 | + ) |
| 72 | + |
| 73 | + if values_list: |
| 74 | + values = ", ".join(values_list) |
| 75 | + insert_query = sa.text(f""" |
| 76 | + INSERT INTO permissions (role_id, scope_type, scope_id, entity_type, operation) |
| 77 | + VALUES {values} |
| 78 | + ON CONFLICT (role_id, scope_type, scope_id, entity_type, operation) DO NOTHING |
| 79 | + """) |
| 80 | + db_conn.execute(insert_query) |
| 81 | + |
| 82 | + |
| 83 | +def _associate_sessions_to_scopes(db_conn: Connection) -> None: |
| 84 | + """Associate all sessions to their owner scopes (USER and PROJECT). |
| 85 | +
|
| 86 | + Creates AUTO edges from: |
| 87 | + - User scope (user_uuid) → Session |
| 88 | + - Project scope (group_id) → Session |
| 89 | +
|
| 90 | + Uses keyset pagination for scalability. |
| 91 | + """ |
| 92 | + entity_type = EntityType.SESSION.value |
| 93 | + relation_type = "auto" |
| 94 | + |
| 95 | + # Process User scope edges |
| 96 | + last_id = "00000000-0000-0000-0000-000000000000" |
| 97 | + while True: |
| 98 | + query = sa.text(""" |
| 99 | + SELECT id::text AS id, user_uuid::text AS user_uuid |
| 100 | + FROM sessions |
| 101 | + WHERE id::text > :last_id |
| 102 | + ORDER BY id |
| 103 | + LIMIT :limit |
| 104 | + """) |
| 105 | + rows = db_conn.execute(query, {"last_id": last_id, "limit": BATCH_SIZE}).all() |
| 106 | + if not rows: |
| 107 | + break |
| 108 | + |
| 109 | + last_id = rows[-1].id |
| 110 | + |
| 111 | + # Prepare values for bulk insert |
| 112 | + values = ", ".join( |
| 113 | + f"('user', '{row.user_uuid}', '{entity_type}', '{row.id}', '{relation_type}')" |
| 114 | + for row in rows |
| 115 | + ) |
| 116 | + |
| 117 | + insert_query = sa.text(f""" |
| 118 | + INSERT INTO association_scopes_entities (scope_type, scope_id, entity_type, entity_id, relation_type) |
| 119 | + VALUES {values} |
| 120 | + ON CONFLICT (scope_type, scope_id, entity_id) DO NOTHING |
| 121 | + """) |
| 122 | + db_conn.execute(insert_query) |
| 123 | + |
| 124 | + # Process Project scope edges |
| 125 | + last_id = "00000000-0000-0000-0000-000000000000" |
| 126 | + while True: |
| 127 | + query = sa.text(""" |
| 128 | + SELECT id::text AS id, group_id::text AS group_id |
| 129 | + FROM sessions |
| 130 | + WHERE id::text > :last_id |
| 131 | + ORDER BY id |
| 132 | + LIMIT :limit |
| 133 | + """) |
| 134 | + rows = db_conn.execute(query, {"last_id": last_id, "limit": BATCH_SIZE}).all() |
| 135 | + if not rows: |
| 136 | + break |
| 137 | + |
| 138 | + last_id = rows[-1].id |
| 139 | + |
| 140 | + # Prepare values for bulk insert |
| 141 | + values = ", ".join( |
| 142 | + f"('project', '{row.group_id}', '{entity_type}', '{row.id}', '{relation_type}')" |
| 143 | + for row in rows |
| 144 | + ) |
| 145 | + |
| 146 | + insert_query = sa.text(f""" |
| 147 | + INSERT INTO association_scopes_entities (scope_type, scope_id, entity_type, entity_id, relation_type) |
| 148 | + VALUES {values} |
| 149 | + ON CONFLICT (scope_type, scope_id, entity_id) DO NOTHING |
| 150 | + """) |
| 151 | + db_conn.execute(insert_query) |
| 152 | + |
| 153 | + |
| 154 | +def _remove_session_permissions(db_conn: Connection) -> None: |
| 155 | + """Remove all SESSION entity-type permissions.""" |
| 156 | + entity_type = EntityType.SESSION.value |
| 157 | + |
| 158 | + while True: |
| 159 | + # Query permission IDs to delete |
| 160 | + query = sa.text(""" |
| 161 | + SELECT id FROM permissions |
| 162 | + WHERE entity_type = :entity_type |
| 163 | + LIMIT :limit |
| 164 | + """) |
| 165 | + rows = db_conn.execute(query, {"entity_type": entity_type, "limit": BATCH_SIZE}).all() |
| 166 | + if not rows: |
| 167 | + break |
| 168 | + |
| 169 | + # Delete the queried permissions |
| 170 | + ids = ", ".join(f"'{row.id}'" for row in rows) |
| 171 | + delete_query = sa.text(f""" |
| 172 | + DELETE FROM permissions |
| 173 | + WHERE id IN ({ids}) |
| 174 | + """) |
| 175 | + db_conn.execute(delete_query) |
| 176 | + |
| 177 | + |
| 178 | +def _remove_session_edges(db_conn: Connection) -> None: |
| 179 | + """Remove all SESSION AUTO edges from association_scopes_entities.""" |
| 180 | + entity_type = EntityType.SESSION.value |
| 181 | + |
| 182 | + while True: |
| 183 | + # Query association IDs to delete |
| 184 | + query = sa.text(""" |
| 185 | + SELECT id FROM association_scopes_entities |
| 186 | + WHERE entity_type = :entity_type |
| 187 | + LIMIT :limit |
| 188 | + """) |
| 189 | + rows = db_conn.execute(query, {"entity_type": entity_type, "limit": BATCH_SIZE}).all() |
| 190 | + if not rows: |
| 191 | + break |
| 192 | + |
| 193 | + # Delete the queried associations |
| 194 | + ids = ", ".join(f"'{row.id}'" for row in rows) |
| 195 | + delete_query = sa.text(f""" |
| 196 | + DELETE FROM association_scopes_entities |
| 197 | + WHERE id IN ({ids}) |
| 198 | + """) |
| 199 | + db_conn.execute(delete_query) |
| 200 | + |
| 201 | + |
| 202 | +def upgrade() -> None: |
| 203 | + conn = op.get_bind() |
| 204 | + _add_entity_type_permissions(conn) |
| 205 | + _associate_sessions_to_scopes(conn) |
| 206 | + |
| 207 | + |
| 208 | +def downgrade() -> None: |
| 209 | + conn = op.get_bind() |
| 210 | + _remove_session_edges(conn) |
| 211 | + _remove_session_permissions(conn) |
0 commit comments