|
| 1 | +import unittest |
| 2 | + |
| 3 | +from ldap3 import Connection, SIMPLE, MOCK_SYNC, OFFLINE_AD_2012_R2, Server |
| 4 | +from ldap3.utils.dn import safe_dn |
| 5 | + |
| 6 | +from auth.auth_ldap import LdapAuthenticator |
| 7 | +from tests import test_utils |
| 8 | +from tests.test_utils import mock_object |
| 9 | + |
| 10 | + |
| 11 | +class _LdapAuthenticatorMockWrapper: |
| 12 | + def __init__(self, username_pattern, base_dn): |
| 13 | + authenticator = LdapAuthenticator({ |
| 14 | + 'url': 'unused', |
| 15 | + 'username_pattern': username_pattern, |
| 16 | + 'base_dn': base_dn}, |
| 17 | + test_utils.temp_folder) |
| 18 | + |
| 19 | + def connect(username, password): |
| 20 | + server = Server('mock_server', get_info=OFFLINE_AD_2012_R2) |
| 21 | + connection = Connection( |
| 22 | + server, |
| 23 | + user=username, |
| 24 | + password=password, |
| 25 | + authentication=SIMPLE, |
| 26 | + read_only=True, |
| 27 | + client_strategy=MOCK_SYNC |
| 28 | + ) |
| 29 | + |
| 30 | + for dn, attrs in self._entries.items(): |
| 31 | + dn = safe_dn(dn) |
| 32 | + |
| 33 | + entry_added = connection.strategy.add_entry(dn, attrs) |
| 34 | + if not entry_added: |
| 35 | + raise Exception('Failed to add entry ' + dn) |
| 36 | + |
| 37 | + lower_keys = {key.lower(): key for key in attrs.keys()} |
| 38 | + |
| 39 | + if 'samaccountname' in lower_keys: |
| 40 | + account_name = attrs[lower_keys['samaccountname']][0] |
| 41 | + domain_start = dn.find('dc=') + 3 |
| 42 | + domain_end = dn.find(',', domain_start) |
| 43 | + domain = dn[domain_start:domain_end] |
| 44 | + connection.server.dit[domain + '\\' + account_name] = connection.server.dit[dn] |
| 45 | + |
| 46 | + if 'userprincipalname' in lower_keys: |
| 47 | + principal_name = attrs[lower_keys['userprincipalname']][0] |
| 48 | + connection.server.dit[principal_name] = connection.server.dit[dn] |
| 49 | + |
| 50 | + connection.bind() |
| 51 | + return connection |
| 52 | + |
| 53 | + authenticator._connect = connect |
| 54 | + |
| 55 | + self.base_dn = base_dn |
| 56 | + self._entries = {} |
| 57 | + self.authenticator = authenticator |
| 58 | + |
| 59 | + def authenticate(self, username, password): |
| 60 | + return self.authenticator.authenticate(_mock_request_handler(username, password)) |
| 61 | + |
| 62 | + def get_groups(self, username): |
| 63 | + return self.authenticator.get_groups(username) |
| 64 | + |
| 65 | + def add_user(self, cn, password, dn=None, **other_attributes): |
| 66 | + if dn is None: |
| 67 | + dn = self.to_user_dn(cn) |
| 68 | + self.add_entry(dn, 'person', userPassword=password, **other_attributes) |
| 69 | + |
| 70 | + def to_user_dn(self, cn): |
| 71 | + prefix = 'cn=' + cn + ',cn=Users' |
| 72 | + if self.base_dn: |
| 73 | + return prefix + ',' + self.base_dn |
| 74 | + return prefix |
| 75 | + |
| 76 | + def add_posix_user(self, cn, password, uid, **other_attributes): |
| 77 | + self.add_user(cn, password, uid=uid, objectClass='posixAccount', **other_attributes) |
| 78 | + |
| 79 | + def add_group(self, cn, member_cns=None, **other_attributes): |
| 80 | + if member_cns: |
| 81 | + member_dns = list(map(self.to_user_dn, member_cns)) |
| 82 | + else: |
| 83 | + member_dns = None |
| 84 | + |
| 85 | + dn = 'cn=' + cn + ',cn=Groups,' + self.base_dn |
| 86 | + self.add_entry(dn, 'groupOfNames', member=member_dns, **other_attributes) |
| 87 | + |
| 88 | + def remove_group(self, cn): |
| 89 | + dn = 'cn=' + cn + ',cn=Groups,' + self.base_dn |
| 90 | + del self._entries[dn] |
| 91 | + |
| 92 | + def add_posix_group(self, cn, member_uids=None, **other_attributes): |
| 93 | + dn = 'cn=' + cn + ',cn=Groups,' + self.base_dn |
| 94 | + self.add_entry(dn, 'posixGroup', memberUid=member_uids, **other_attributes) |
| 95 | + |
| 96 | + def add_entry(self, dn, object_class, **attrs): |
| 97 | + if 'objectClass' in attrs: |
| 98 | + if not isinstance(attrs['objectClass'], list): |
| 99 | + attrs['objectClass'] = [attrs['objectClass']] |
| 100 | + else: |
| 101 | + attrs['objectClass'] = [] |
| 102 | + attrs['objectClass'].append(object_class) |
| 103 | + |
| 104 | + self._entries[dn] = attrs |
| 105 | + |
| 106 | + |
| 107 | +class TestFindGroups(unittest.TestCase): |
| 108 | + |
| 109 | + def test_load_single_group_by_member_when_dn_template(self): |
| 110 | + auth_wrapper = _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=ldap,dc=test', 'dc=ldap,dc=test') |
| 111 | + auth_wrapper.add_user('user1', '1234') |
| 112 | + auth_wrapper.add_group('group1', ['user1']) |
| 113 | + |
| 114 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 115 | + self.assertEqual(['group1'], groups) |
| 116 | + |
| 117 | + def test_load_multiple_groups_by_member_when_dn_template(self): |
| 118 | + auth_wrapper = _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=ldap,dc=test', 'dc=ldap,dc=test') |
| 119 | + auth_wrapper.add_user('user1', '1234') |
| 120 | + auth_wrapper.add_group('group1', ['user1', 'user2']) |
| 121 | + auth_wrapper.add_group('group2', ['user1']) |
| 122 | + auth_wrapper.add_group('group3', ['user3', 'user1', 'user4']) |
| 123 | + auth_wrapper.add_group('group4', ['user5']) |
| 124 | + |
| 125 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 126 | + self.assertCountEqual(['group1', 'group2', 'group3'], groups) |
| 127 | + |
| 128 | + def test_load_single_group_by_uid_when_dn_template(self): |
| 129 | + auth_wrapper = _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=ldap,dc=test', 'dc=ldap,dc=test') |
| 130 | + auth_wrapper.add_posix_user('user1', '1234', 'uid_X') |
| 131 | + auth_wrapper.add_posix_group('group1', ['uid_X']) |
| 132 | + |
| 133 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 134 | + self.assertCountEqual(['group1'], groups) |
| 135 | + |
| 136 | + def test_load_multiple_groups_by_uid_when_dn_template(self): |
| 137 | + auth_wrapper = _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=ldap,dc=test', 'dc=ldap,dc=test') |
| 138 | + auth_wrapper.add_posix_user('user1', '1234', 'uid_X') |
| 139 | + auth_wrapper.add_posix_group('group1', ['uid_X']) |
| 140 | + auth_wrapper.add_posix_group('group2', ['uid_X']) |
| 141 | + auth_wrapper.add_posix_group('group3', ['uid_X']) |
| 142 | + |
| 143 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 144 | + self.assertCountEqual(['group1', 'group2', 'group3'], groups) |
| 145 | + |
| 146 | + def test_load_multiple_groups_by_member_and_uid_when_dn_template(self): |
| 147 | + auth_wrapper = _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=ldap,dc=test', 'dc=ldap,dc=test') |
| 148 | + auth_wrapper.add_posix_user('user1', '1234', 'uid_X') |
| 149 | + auth_wrapper.add_posix_group('group1', ['uid_X']) |
| 150 | + auth_wrapper.add_group('group2', ['user1']) |
| 151 | + |
| 152 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 153 | + self.assertCountEqual(['group1', 'group2'], groups) |
| 154 | + |
| 155 | + def test_load_same_groups_by_member_and_uid_when_dn_template(self): |
| 156 | + auth_wrapper = _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=ldap,dc=test', 'dc=ldap,dc=test') |
| 157 | + auth_wrapper.add_posix_user('user1', '1234', 'uid_X') |
| 158 | + auth_wrapper.add_group('group1', ['user1', 'user2'], memberUid=['uid_X'], objectClass='posixGroup') |
| 159 | + auth_wrapper.add_group('group2', ['user1'], memberUid=['uid_X', 'uid_Y'], objectClass='posixGroup') |
| 160 | + |
| 161 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 162 | + self.assertCountEqual(['group1', 'group2'], groups) |
| 163 | + |
| 164 | + def test_load_single_group_by_member_when_sam_account_template(self): |
| 165 | + auth_wrapper = _LdapAuthenticatorMockWrapper('some_domain\\$username', 'dc=some_domain,dc=test') |
| 166 | + auth_wrapper.add_user('User Noname', '1234', sAMAccountName='user1') |
| 167 | + auth_wrapper.add_group('group1', ['User Noname']) |
| 168 | + |
| 169 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 170 | + self.assertEqual(['group1'], groups) |
| 171 | + |
| 172 | + def test_load_single_group_by_uid_when_sam_account_template(self): |
| 173 | + auth_wrapper = _LdapAuthenticatorMockWrapper('some_domain\\$username', 'dc=some_domain,dc=test') |
| 174 | + auth_wrapper.add_posix_user('User Noname', '1234', 'uid_X', sAMAccountName='user1') |
| 175 | + auth_wrapper.add_posix_group('group1', ['uid_X']) |
| 176 | + |
| 177 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 178 | + self.assertEqual(['group1'], groups) |
| 179 | + |
| 180 | + def test_load_single_group_by_member_when_user_principal_template(self): |
| 181 | + auth_wrapper = _LdapAuthenticatorMockWrapper( '[email protected]', 'dc=buggy,dc=net') |
| 182 | + auth_wrapper. add_user( 'User Noname', '1234', userPrincipalName='[email protected]') |
| 183 | + auth_wrapper.add_group('group1', ['User Noname']) |
| 184 | + |
| 185 | + groups = self.auth_and_get_groups('user1', auth_wrapper) |
| 186 | + self.assertEqual(['group1'], groups) |
| 187 | + |
| 188 | + def test_cannot_load_group_when_same_principal_names(self): |
| 189 | + auth_wrapper = _LdapAuthenticatorMockWrapper( '[email protected]', 'dc=buggy,dc=net') |
| 190 | + auth_wrapper. add_user( 'user1', '1234', userPrincipalName='[email protected]') |
| 191 | + auth_wrapper. add_user( 'user2', '1234', userPrincipalName='[email protected]') |
| 192 | + auth_wrapper.add_group('group1', ['user1', 'user2']) |
| 193 | + |
| 194 | + groups = self.auth_and_get_groups('userX', auth_wrapper) |
| 195 | + self.assertEqual([], groups) |
| 196 | + |
| 197 | + def auth_and_get_groups(self, username, auth_wrapper): |
| 198 | + self.authenticate(username, '1234', auth_wrapper) |
| 199 | + groups = auth_wrapper.get_groups(username) |
| 200 | + return groups |
| 201 | + |
| 202 | + def setUp(self): |
| 203 | + test_utils.setup() |
| 204 | + |
| 205 | + def tearDown(self): |
| 206 | + test_utils.cleanup() |
| 207 | + |
| 208 | + def authenticate(self, username, password, auth_wrapper): |
| 209 | + user = auth_wrapper.authenticate(username, password) |
| 210 | + self.assertEqual(user, username) |
| 211 | + |
| 212 | + |
| 213 | +class TestGroupsPersistence(unittest.TestCase): |
| 214 | + def test_get_groups_after_login(self): |
| 215 | + self.auth_wrapper.add_user('user1', '1234') |
| 216 | + self.auth_wrapper.add_group('group1', ['user1']) |
| 217 | + |
| 218 | + self.authenticate('user1', '1234') |
| 219 | + groups = self.auth_wrapper.get_groups('user1') |
| 220 | + self.assertCountEqual(['group1'], groups) |
| 221 | + |
| 222 | + def test_get_groups_after_login_for_different_users(self): |
| 223 | + self.auth_wrapper.add_user('user1', '1234') |
| 224 | + self.auth_wrapper.add_user('user2', '1234') |
| 225 | + self.auth_wrapper.add_group('group1', ['user1']) |
| 226 | + self.auth_wrapper.add_group('group2', ['user2']) |
| 227 | + |
| 228 | + self.authenticate('user1', '1234') |
| 229 | + groups = self.auth_wrapper.get_groups('user1') |
| 230 | + self.assertCountEqual(['group1'], groups) |
| 231 | + |
| 232 | + self.authenticate('user2', '1234') |
| 233 | + groups = self.auth_wrapper.get_groups('user2') |
| 234 | + self.assertCountEqual(['group2'], groups) |
| 235 | + |
| 236 | + def test_renew_groups_after_login(self): |
| 237 | + self.auth_wrapper.add_user('user1', '1234') |
| 238 | + self.auth_wrapper.add_group('group1', ['user1']) |
| 239 | + |
| 240 | + self.authenticate('user1', '1234') |
| 241 | + |
| 242 | + self.auth_wrapper.remove_group('group1') |
| 243 | + self.auth_wrapper.add_group('group2', ['user1']) |
| 244 | + self.auth_wrapper.add_group('group3', ['user1']) |
| 245 | + |
| 246 | + self.authenticate('user1', '1234') |
| 247 | + groups = self.auth_wrapper.get_groups('user1') |
| 248 | + self.assertCountEqual(['group2', 'group3'], groups) |
| 249 | + |
| 250 | + def test_restore_groups_after_restart(self): |
| 251 | + self.auth_wrapper.add_user('user1', '1234') |
| 252 | + self.auth_wrapper.add_group('group1', ['user1']) |
| 253 | + |
| 254 | + self.authenticate('user1', '1234') |
| 255 | + |
| 256 | + new_wrapper = self.create_wrapper() |
| 257 | + |
| 258 | + groups = new_wrapper.get_groups('user1') |
| 259 | + self.assertCountEqual(['group1'], groups) |
| 260 | + |
| 261 | + def setUp(self): |
| 262 | + test_utils.setup() |
| 263 | + |
| 264 | + self.auth_wrapper = self.create_wrapper() |
| 265 | + |
| 266 | + def create_wrapper(self): |
| 267 | + return _LdapAuthenticatorMockWrapper('cn=$username,cn=Users,dc=buggy,dc=net', 'dc=buggy,dc=net') |
| 268 | + |
| 269 | + def tearDown(self): |
| 270 | + test_utils.cleanup() |
| 271 | + |
| 272 | + def authenticate(self, username, password): |
| 273 | + user = self.auth_wrapper.authenticate(username, password) |
| 274 | + self.assertEqual(user, username) |
| 275 | + |
| 276 | + |
| 277 | +def _mock_request_handler(username, password): |
| 278 | + request_handler = mock_object() |
| 279 | + |
| 280 | + def get_argument(arg_name): |
| 281 | + if arg_name == 'username': |
| 282 | + return username |
| 283 | + elif arg_name == 'password': |
| 284 | + return password |
| 285 | + else: |
| 286 | + return None |
| 287 | + |
| 288 | + request_handler.get_argument = get_argument |
| 289 | + return request_handler |
0 commit comments