-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrestore_access_rights.py
More file actions
171 lines (150 loc) · 6.32 KB
/
restore_access_rights.py
File metadata and controls
171 lines (150 loc) · 6.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Copyright 2025 UW-IT, University of Washington
# SPDX-License-Identifier: Apache-2.0
from django.core.management.base import BaseCommand
from endorsement.models import AccessRecord, AccessRight, AccessRecordConflict
from endorsement.dao.access import (
get_accessee_model, store_access_record, set_delegate)
from endorsement.dao.office import get_office_accessor
from endorsement.exceptions import UnrecognizedUWNetid, UnrecognizedGroupID
from uw_msca.delegate import get_all_delegates, _msca_get_delegate_url
from uw_msca import get_resource
import json
import csv
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Command(BaseCommand):
help = "Restore Office365 mailbox access from MSCA"
def add_arguments(self, parser):
parser.add_argument(
'--commit',
action='store_true',
default=False,
help='Store access record changes',
)
parser.add_argument(
'--netid',
type=str,
help='Netid to restore rights for')
parser.add_argument(
'--csv',
type=str,
help='CSV of accessor netid, accessor name, access right')
def handle(self, *args, **options):
self.commit_changes = options['commit']
netid = options['netid']
csv_file = options['csv']
try:
if netid:
self.restore_netid_access(netid)
elif csv_file:
self.restore_csv_access(csv_file)
except Exception as ex:
logger.error("restore_access_rights: Exception: {}".format(ex))
def restore_netid_access(self, netid):
accessee = get_accessee_model(netid)
for delegate, right in self.get_delegates_for_netid(netid).items():
try:
self.fix_access_record(accessee, delegate, right)
except Exception as ex:
logger.info(f"ERROR: assign delegate {delegate}: {ex}")
continue
def get_delegates_for_netid(self, netid):
url = _msca_get_delegate_url(netid)
response = get_resource(url)
json_response = json.loads(response)
delegates = {}
for delegation in json_response:
if netid != delegation['netid']:
raise Exception("netid mismatch")
netid = delegation['netid']
delegations = delegation['delegates']
if isinstance(delegations, dict):
user = delegations['User']
if not user:
continue
rights = delegations['AccessRights']
delegates[user] = rights
elif isinstance(delegations, list):
for d in delegations:
user = d['User']
if not user:
continue
rights = d['AccessRights']
if isinstance(rights, list):
if len(rights) == 1:
delegates[user] = rights[0]
else:
raise Exception(f"multiple rights for "
f"{user}: {rights}")
elif isinstance(rights, str):
delegates[user] = rights
else:
raise Exception(f"unknown right type for {user}")
return delegates
def fix_access_record(self, accessee, delegate, right):
try:
accessor = get_office_accessor(delegate)
except Exception as ex:
raise Exception(f"ERROR: get accessor {delegate}: {ex}")
try:
ar = AccessRecord.objects.get(
accessee=accessee, accessor=accessor)
except AccessRecord.DoesNotExist:
raise Exception(f"ERROR: no record: mailbox {netid} "
f"delegate {delegate}")
try:
rr = AccessRight.objects.get(name=right)
except AccessRight.DoesNotExist:
raise Exception(f"ERROR: unknown right {right} ")
if ar.access_right != rr:
if self.commit_changes:
ar.access_right = rr
ar.save()
logger.info(
f"{'' if self.commit_changes else 'WOULD '}ASSIGN: "
f"mailbox {ar.accessee.netid} "
f"delegate {ar.accessor.name} "
f"right '{ar.access_right.display_name if (
self.commit_changes) else rr.display_name}'")
def reconcile_csv_access(self, csv_file):
delegations = {}
with open(csv_file, 'r') as f:
blank_reader = csv.reader(f)
for i, line in enumerate(blank_reader):
delegates = json.loads(line[1])
if not delegates or delegates == 'null':
continue
netid = line[0]
if netid not in delegations:
delegations[netid] = {}
if isinstance(delegates, dict):
user = delegates['User']
right = delegates['AccessRights']
delegations[netid][user] = right
else:
for d in delegates:
user = d['User']
right = d['AccessRights']
delegations[netid][user] = right
with open('/tmp/blanks_remaining.csv', 'r') as f:
blank_reader = csv.reader(f)
for i, line in enumerate(blank_reader):
netid = line[0]
if netid not in delegations:
logger.info(f"mailbox {netid} has no delegates")
continue
try:
accessee = get_accessee_model(netid)
except UnrecognizedUWNetid:
logger.info(f"ERROR: get accessee {netid}: unrecognized")
continue
except Exception as ex:
logger.info(f"ERROR: get accessee {netid}: {ex}")
continue
for delegate, right in delegations[netid].items():
try:
fix_access_record(accessee, delegate, right)
except Exception as ex:
logger.info(f"ERROR: assign delegate {delegate}: {ex}")
continue