Browse Source

LDAP auth: allow finding groups based on separate search

Instead of searching for the membership attribute on the user side
(usually AD: memberOf, Novell eDirectory: groupMembership) to determine
the groups the user loging on is a member of, allow performing a separate
search for the groups having the user as member and use the found groups' DNs.

The group search is performed in the context of 'ldap_reader_dn', after
the user DN has been found in the directory, but before the authentication
has been performed by doing an LDAP bind in the user's context.

Although this may - in the case of unsuccessful login attempts -
double the number of queries to the LDAP server, it has been done
this way to keep the number of LDAP contexts minimal.

Doing the group search in the context of the user logging on is no viable
option, because there are known implementations where regular users do not
have the necessary permissions to query the groups they are a member in.
Peter Marschall 7 tháng trước cách đây
mục cha
commit
8821612fa8
2 tập tin đã thay đổi với 116 bổ sung24 xóa
  1. 104 24
      radicale/auth/ldap.py
  2. 12 0
      radicale/config.py

+ 104 - 24
radicale/auth/ldap.py

@@ -30,6 +30,10 @@ Following parameters controls SSL connections:
    ldap_security    The encryption mode to be used: *none*|tls|starttls
    ldap_ssl_verify_mode The certificate verification mode. Works for tls and starttls. NONE, OPTIONAL, default is REQUIRED
    ldap_ssl_ca_file
+ The following parameters are optional:
+   ldap_group_base        Base DN to search for groups. Only if it differs from ldap_base and if ldap_group_members_attribute is set
+   ldap_group_filter      Search filter to search for groups having the user as member. Only if ldap_group_members_attribute is set
+   ldap_group_members_attribute    Attribute in the group entries to read the group's members from
 
 """
 import ssl
@@ -47,6 +51,9 @@ class Auth(auth.BaseAuth):
     _ldap_attributes: list[str] = []
     _ldap_user_attr: str
     _ldap_groups_attr: str
+    _ldap_group_base: str
+    _ldap_group_filter: str
+    _ldap_group_members_attr: str
     _ldap_module_version: int = 3
     _ldap_use_ssl: bool = False
     _ldap_security: str = "none"
@@ -78,6 +85,9 @@ class Auth(auth.BaseAuth):
         self._ldap_filter = configuration.get("auth", "ldap_filter")
         self._ldap_user_attr = configuration.get("auth", "ldap_user_attribute")
         self._ldap_groups_attr = configuration.get("auth", "ldap_groups_attribute")
+        self._ldap_group_base = configuration.get("auth", "ldap_group_base")
+        self._ldap_group_filter = configuration.get("auth", "ldap_group_filter")
+        self._ldap_group_members_attr = configuration.get("auth", "ldap_group_members_attribute")
         ldap_secret_file_path = configuration.get("auth", "ldap_secret_file")
         if ldap_secret_file_path:
             with open(ldap_secret_file_path, 'r') as file:
@@ -110,6 +120,19 @@ class Auth(auth.BaseAuth):
             logger.info("auth.ldap_groups_attribute: %r" % self._ldap_groups_attr)
         else:
             logger.info("auth.ldap_groups_attribute: (not provided)")
+        if self._ldap_group_base:
+            logger.info("auth.ldap_group_base     : %r" % self._ldap_group_base)
+        else:
+            logger.info("auth.ldap_group_base     : (not provided, using ldap_base)")
+            self._ldap_group_base = self._ldap_base
+        if self._ldap_group_filter:
+            logger.info("auth.ldap_group_filter: %r" % self._ldap_group_filter)
+        else:
+            logger.info("auth.ldap_group_filter: (not provided)")
+        if self._ldap_group_members_attr:
+            logger.info("auth.ldap_group_members_attr: %r" % self._ldap_group_members_attr)
+        else:
+            logger.info("auth.ldap_group_members_attr: (not provided)")
         if ldap_secret_file_path:
             logger.info("auth.ldap_secret_file_path: %r" % ldap_secret_file_path)
             if self._ldap_secret:
@@ -160,6 +183,30 @@ class Auth(auth.BaseAuth):
             user_entry = res[0]
             user_dn = user_entry[0]
             logger.debug(f"_login2 found LDAP user DN {user_dn}")
+
+            """Let's collect the groups of the user."""
+            groupDNs: list[str] = []
+            if self._ldap_groups_attr:
+                groupDNs = user_entry[1][self._ldap_groups_attr]
+
+            """Search for all groups having the user_dn found as member."""
+            if self._ldap_group_members_attr:
+                groupDNs = []
+                res = conn.search_s(
+                    self._ldap_group_base,
+                    self.ldap.SCOPE_SUBTREE,
+                    filterstr="(&{0}({1}={2}))".format(
+                        self._ldap_group_filter,
+                        self._ldap_group_members_attr,
+                        self.ldap.filter.escape_filter_chars(user_dn)),
+                    attrlist=['1.1']
+                )
+                """Fill groupDNs with DNs of groups found"""
+                if len(res) > 0:
+                    groupDNs = []
+                    for dn,entry in res:
+                        groupDNs.append(dn)
+
             """Close LDAP connection"""
             conn.unbind()
         except Exception as e:
@@ -171,23 +218,23 @@ class Auth(auth.BaseAuth):
             conn.protocol_version = 3
             conn.set_option(self.ldap.OPT_REFERRALS, 0)
             conn.simple_bind_s(user_dn, password)
-            tmp: list[str] = []
-            if self._ldap_groups_attr:
-                tmp = []
-                for g in user_entry[1][self._ldap_groups_attr]:
-                    """Get group g's RDN's attribute value"""
-                    try:
-                        rdns = self.ldap.dn.explode_dn(g, notypes=True)
-                        tmp.append(rdns[0])
-                    except Exception:
-                        tmp.append(g.decode('utf8'))
-                self._ldap_groups = set(tmp)
-                logger.debug("_login2 LDAP groups of user: %s", ",".join(self._ldap_groups))
             if self._ldap_user_attr:
                 if user_entry[1][self._ldap_user_attr]:
                     tmplogin = user_entry[1][self._ldap_user_attr][0]
                     login = tmplogin.decode('utf-8')
                     logger.debug(f"_login2 user set to: '{login}'")
+
+            """Get RDNs of groups' DNs"""
+            tmp: list[str] = []
+            for g in groupDNs:
+                try:
+                    rdns = self.ldap.dn.explode_dn(g, notypes=True)
+                    tmp.append(rdns[0])
+                except Exception:
+                    tmp.append(g.decode('utf8'))
+            self._ldap_groups = set(tmp)
+            logger.debug("_login2 LDAP groups of user: %s", ",".join(self._ldap_groups))
+
             conn.unbind()
             logger.debug(f"_login2 {login} successfully authenticated")
             return login
@@ -249,9 +296,42 @@ class Auth(auth.BaseAuth):
             return ""
 
         user_entry = conn.response[0]
-        conn.unbind()
         user_dn = user_entry['dn']
         logger.debug(f"_login3 found LDAP user DN {user_dn}")
+
+        """Let's collect the groups of the user."""
+        groupDNs: list[str] = []
+        if self._ldap_groups_attr:
+            if user_entry['attributes'][self._ldap_groups_attr]:
+                if isinstance(user_entry['attributes'][self._ldap_groups_attr], list):
+                    groupDNs = user_entry['attributes'][self._ldap_groups_attr]
+                else:
+                    groupDNs.append(user_entry['attributes'][self._ldap_groups_attr])
+
+        """Search for all groups having the user_dn found as member."""
+        if self._ldap_group_members_attr:
+            try:
+                conn.search(
+                    search_base=self._ldap_group_base,
+                    search_filter="(&{0}({1}={2}))".format(
+                        self._ldap_group_filter,
+                        self._ldap_group_members_attr,
+                        self.ldap3.utils.conv.escape_filter_chars(user_dn)),
+                    search_scope=self.ldap3.SUBTREE,
+                    attributes=['1.1']
+                )
+            except Exception as e:
+                """LDAP search failed: consider it as non-fatal - only groups missing"""
+                logger.debug(f"_ldap3: LDAP group search failed: {e}")
+            else:
+                """Fill groupDNs with DNs of groups found"""
+                groupDNs = []
+                for group in conn.response:
+                    groupDNs.append(group['dn'])
+
+        """Close LDAP connection"""
+        conn.unbind()
+
         try:
             """Try to bind as the user itself"""
             try:
@@ -264,18 +344,18 @@ class Auth(auth.BaseAuth):
             if not conn.bind(read_server_info=False):
                 logger.debug(f"_login3 user '{login}' cannot be found")
                 return ""
+
+            """Get RDNs of groups' DNs"""
             tmp: list[str] = []
-            if self._ldap_groups_attr:
-                tmp = []
-                for g in user_entry['attributes'][self._ldap_groups_attr]:
-                    """Get group g's RDN's attribute value"""
-                    try:
-                        rdns = self.ldap3.utils.dn.parse_dn(g)
-                        tmp.append(rdns[0][1])
-                    except Exception:
-                        tmp.append(g)
-                self._ldap_groups = set(tmp)
-                logger.debug("_login3 LDAP groups of user: %s", ",".join(self._ldap_groups))
+            for g in groupDNs:
+                try:
+                    rdns = self.ldap3.utils.dn.parse_dn(g)
+                    tmp.append(rdns[0][1])
+                except Exception:
+                    tmp.append(g)
+            self._ldap_groups = set(tmp)
+            logger.debug("_login3 LDAP groups of user: %s", ",".join(self._ldap_groups))
+
             if self._ldap_user_attr:
                 if user_entry['attributes'][self._ldap_user_attr]:
                     if isinstance(user_entry['attributes'][self._ldap_user_attr], list):

+ 12 - 0
radicale/config.py

@@ -297,6 +297,18 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
             "value": "",
             "help": "attribute to read the group memberships from",
             "type": str}),
+        ("ldap_group_members_attribute", {
+            "value": "",
+            "help": "Attribute in the group entries to read the group's members from",
+            "type": str}),
+        ("ldap_group_base", {
+            "value": "",
+            "help": "Base DN to search for groups. Only if it differs from ldap_base and if ldap_group_members_attribute is set",
+            "type": str}),
+        ("ldap_group_filter", {
+            "value": "",
+            "help": "Search filter to search for groups having the user as member. Only if ldap_group_members_attribute is set",
+            "type": str}),
         ("ldap_use_ssl", {
             "value": "False",
             "help": "Use ssl on the ldap connection. Soon to be deprecated, use ldap_security instead",