Skip to content

Commit dd232b7

Browse files
authored
Merge pull request #56 from livestreamx/arogov
mark dai galku (ldap -> ldap3)
2 parents 9136680 + 82bcadf commit dd232b7

File tree

5 files changed

+57
-98
lines changed

5 files changed

+57
-98
lines changed

makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ SPHINXAPIDOC_OPTS = -f -d 3 --ext-autodoc
1717
COV_BADGE_SVG = $(DOCS_IMAGES_DIR)/coverage.svg
1818
MYPY_CACHE_DIR = .mypy_cache
1919

20-
MIN_COVERAGE = 89.1
20+
MIN_COVERAGE = 88.9
2121
PYTHON_VERSION ?= 3.12
2222

2323
JOBS ?= 4
+55-31
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,78 @@
11
import logging
22
import re
3+
import typing
34

4-
import ldap
5-
from ldap.ldapobject import LDAPObject
5+
import ldap3
6+
from ldap3.core.exceptions import LDAPExceptionError
67
from pydantic import SecretStr
78

89
from overhave.transport.ldap.settings import OverhaveLdapClientSettings
910

1011
logger = logging.getLogger(__name__)
1112

1213

14+
class LdapUserSearchAttributes(typing.TypedDict):
15+
"""LDAP user search attributes."""
16+
17+
memberOf: list[str] # noqa: N815
18+
19+
1320
class LDAPAuthenticator:
14-
"""Class-client for LDAP authentication."""
21+
"""Overhave LDAP client."""
1522

1623
def __init__(self, settings: OverhaveLdapClientSettings) -> None:
24+
self._server = ldap3.Server(
25+
settings.url,
26+
use_ssl=True,
27+
connect_timeout=settings.timeout.seconds,
28+
)
1729
self._settings = settings
18-
self._ldap_connection: LDAPObject | None = None
19-
20-
def _connect(self, login: str, password: SecretStr) -> None:
21-
ldap_connection = ldap.initialize(self._settings.url)
22-
ldap_connection.set_option(ldap.OPT_REFERRALS, 0)
23-
ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, self._settings.timeout.seconds)
24-
if self._settings.tls_enabled:
25-
ldap_connection.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
26-
ldap_connection.start_tls_s()
27-
ldap_connection.simple_bind_s(f"{self._settings.domain}{login}", password.get_secret_value())
28-
self._ldap_connection = ldap_connection
29-
30-
def _ask_ad_usergroups(self, login: str) -> list[str]:
31-
result = self._ldap_connection.search_st( # type: ignore
32-
base=self._settings.dn,
33-
scope=ldap.SCOPE_SUBTREE,
34-
filterstr=f"(sAMAccountName={login})",
35-
attrlist=["memberOf"],
36-
timeout=self._settings.timeout.seconds,
30+
31+
def _get_connection(self, login: str, password: SecretStr) -> ldap3.Connection:
32+
return ldap3.Connection(
33+
self._server,
34+
user=f"{self._settings.domain}{login}",
35+
password=password.get_secret_value(),
36+
receive_timeout=self._settings.timeout.seconds,
37+
read_only=True,
38+
auto_referrals=False,
39+
auto_bind=True,
40+
)
41+
42+
def _ask_ad_usergroups(self, connection: ldap3.Connection, login: str) -> list[str]:
43+
# Выполнение поискового запроса
44+
connection.search(
45+
search_base=self._settings.dn,
46+
search_filter=f"(sAMAccountName={login})",
47+
search_scope=ldap3.SUBTREE,
48+
attributes=["memberOf"],
49+
time_limit=self._settings.timeout.seconds,
3750
)
38-
member_of = result[0][1]["memberOf"]
39-
member_of = [m.decode("utf8") for m in member_of]
51+
if connection.result.get("result", 1) != 0:
52+
return []
53+
if connection.response is None: # pragma: no cover
54+
# should not happen - result was 0
55+
raise ValueError
56+
57+
members_of: list[LdapUserSearchAttributes] = [
58+
el["attributes"].get("memberOf", []) for el in connection.response if "attributes" in el
59+
]
60+
61+
if len(members_of) != 1:
62+
raise ValueError("Multiple or zero users with the same login")
63+
64+
member_of = members_of[0]
4065

4166
p = re.compile("CN=(.*?),", re.IGNORECASE)
4267
return [
43-
p.match(x).group(1) # type: ignore
68+
p.match(x).group(1) # type: ignore[union-attr]
4469
for x in list(filter(lambda x: "OU=Security Groups" in x or "OU=Mail Groups" in x, member_of))
4570
]
4671

4772
def get_user_groups(self, login: str, password: SecretStr) -> list[str] | None:
4873
try:
49-
self._connect(login, password)
50-
except ldap.INVALID_CREDENTIALS:
51-
logger.info("Failed LDAP auth_managers for user: %s", login)
52-
return None
53-
54-
return self._ask_ad_usergroups(login)
74+
with self._get_connection(login, password) as conn:
75+
return self._ask_ad_usergroups(conn, login)
76+
except LDAPExceptionError:
77+
logger.debug("Failed LDAP authorization for user: %s", login)
78+
return None

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "overhave"
3-
version = "5.2.1"
3+
version = "5.2.2"
44
description = "Overhave - web-framework for BDD"
55
readme = "README.rst"
66
authors = [

tests/unit/authorization/conftest.py

-51
This file was deleted.

tests/unit/authorization/test_ldap.py

-14
This file was deleted.

0 commit comments

Comments
 (0)