diff --git a/docs/config.rst b/docs/config.rst index fc531ff45..923b87e05 100755 --- a/docs/config.rst +++ b/docs/config.rst @@ -214,6 +214,8 @@ Use config.py to configure the following parameters. By default it will use SQLL | | | | | | It authenticates with "format-userexample".| | +----------------------------------------+--------------------------------------------+-----------+ +| AUTH_LDAP_USE_NESTED_GROUPS_FOR_ROLES | Get users nested groups from LDAP(MS AD) | No | ++----------------------------------------+--------------------------------------------+-----------+ | AUTH_ROLE_ADMIN | Configure the name of the admin role. | No | +----------------------------------------+--------------------------------------------+-----------+ | AUTH_ROLE_PUBLIC | Special Role that holds the public | No | @@ -338,10 +340,10 @@ It should be a long random bytes or str. For example, copy the output of this to Using config.py --------------- - + My favorite way, and the one I advise if you are building a medium to large size application is to place all your configuration keys on a config.py file - + Next you only have to import them to the Flask app object, like this :: @@ -356,10 +358,10 @@ Take a look at the skeleton `config.py `_ expression to evalate user registration role. The input values is ``userinfo`` dict, returned by ``get_oauth_user_info`` function of Security Manager. -Usage of JMESPath expressions requires `jmespath `_ package +Usage of JMESPath expressions requires `jmespath `_ package to be installed. In case of Google OAuth, userinfo contains user's email that can be used to map some users as admins diff --git a/flask_appbuilder/security/manager.py b/flask_appbuilder/security/manager.py index d046eec9f..f23c751f7 100644 --- a/flask_appbuilder/security/manager.py +++ b/flask_appbuilder/security/manager.py @@ -257,6 +257,11 @@ def __init__(self, appbuilder): app.config.setdefault("AUTH_LDAP_FIRSTNAME_FIELD", "givenName") app.config.setdefault("AUTH_LDAP_LASTNAME_FIELD", "sn") app.config.setdefault("AUTH_LDAP_EMAIL_FIELD", "mail") + # Nested groups options + app.config.setdefault("AUTH_LDAP_USE_NESTED_GROUPS_FOR_ROLES", False) + + if self.auth_type == AUTH_REMOTE_USER: + app.config.setdefault("AUTH_REMOTE_USER_ENV_VAR", "REMOTE_USER") if self.auth_type == AUTH_REMOTE_USER: app.config.setdefault("AUTH_REMOTE_USER_ENV_VAR", "REMOTE_USER") @@ -509,6 +514,10 @@ def auth_ldap_tls_certfile(self): def auth_ldap_tls_keyfile(self): return self.appbuilder.get_app.config["AUTH_LDAP_TLS_KEYFILE"] + @property + def auth_ldap_use_nested_groups_for_roles(self): + return self.appbuilder.get_app.config["AUTH_LDAP_USE_NESTED_GROUPS_FOR_ROLES"] + @property def openid_providers(self): return self.appbuilder.get_app.config["OPENID_PROVIDERS"] @@ -1047,11 +1056,54 @@ def _search_ldap(self, ldap, con, username): user_dn = search_result[0][0] # extract the other attributes user_info = search_result[0][1] - # return - return user_dn, user_info except (IndexError, NameError): return None, None + # get nested groups for user + if self.auth_ldap_use_nested_groups_for_roles: + nested_groups = self._ldap_get_nested_groups(ldap, con, user_dn) + + if self.auth_ldap_group_field in user_info: + user_info[self.auth_ldap_group_field].extend(nested_groups) + else: + user_info[self.auth_ldap_group_field] = nested_groups + + # return + return user_dn, user_info + + def _ldap_get_nested_groups(self, ldap, con, user_dn) -> List[str]: + """ + Searches nested groups for user. Only for MS AD version. + + :param ldap: The ldap module reference + :param con: The ldap connection + :param user_dn: user DN to match with CN + :return: ldap groups array + """ + log.debug("Nested groups for LDAP enabled.") + # filter for microsoft active directory only + nested_groups_filter_str = ( + f"(&(objectCategory=Group)(member:1.2.840.113556.1.4.1941:={user_dn}))" + ) + nested_groups_request_fields = ["cn"] + + nested_groups_search_result = con.search_s( + self.auth_ldap_search, + ldap.SCOPE_SUBTREE, + nested_groups_filter_str, + nested_groups_request_fields, + ) + log.debug( + "LDAP search for nested groups returned: %s", + nested_groups_search_result, + ) + + nested_groups = [ + x[0].encode() for x in nested_groups_search_result if x[0] is not None + ] + log.debug("LDAP nested groups for users: %s", nested_groups) + return nested_groups + def _ldap_calculate_user_roles( self, user_attributes: Dict[str, bytes] ) -> List[str]: diff --git a/tests/security/test_auth_ldap.py b/tests/security/test_auth_ldap.py index f5596e2ec..be30596a9 100644 --- a/tests/security/test_auth_ldap.py +++ b/tests/security/test_auth_ldap.py @@ -959,3 +959,46 @@ def test_login_failed_keep_next_url(self): follow_redirects=False, ) assert response.location == "/users/userinfo/" + + def test__ldap_get_nested_groups(self): + """ + LDAP: test `_search_ldap` method (with AUTH_LDAP_USE_NESTED_GROUPS_FOR_ROLES) + """ + self.app.config["AUTH_LDAP_BIND_USER"] = "cn=admin,dc=example,dc=org" + self.app.config["AUTH_LDAP_BIND_PASSWORD"] = "admin_password" + self.app.config["AUTH_LDAP_SEARCH"] = "ou=users,dc=example,dc=org" + self.app.config["AUTH_LDAP_USE_NESTED_GROUPS_FOR_ROLES"] = "true" + self.appbuilder = AppBuilder(self.app, self.db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + user_alice = ( + "cn=test,ou=groups,dc=example,dc=org", + { + "member:1.2.840.113556.1.4.1941:": [ + b"cn=alice,ou=users,dc=example,dc=org" + ], + }, + ) + # run `_search_ldap` method w/mocked ldap connection + mock_con = Mock() + mock_con.search_s.return_value = [ + ( + None, + [ + "ldap://ForestDnsZones.mycompany.com/" + "DC=ForestDnsZones,DC=mycompany,DC=com" + ], + ), + user_alice, + (None, ["ldap://mycompany.com/CN=Configuration,DC=mycompany,DC=com"]), + ] + nested_groups = sm._ldap_get_nested_groups( + ldap, mock_con, "cn=alice,ou=users,dc=example,dc=org" + ) + + # validate - search returned expected data + self.assertEqual(len(nested_groups), 1) + self.assertEqual(nested_groups[0], b"cn=test,ou=groups,dc=example,dc=org") + + mock_con.search_s.assert_called()