ldap.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2022-2024 Peter Varkoly
  3. # Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
  4. #
  5. # This library is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This library is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  17. """
  18. Authentication backend that checks credentials with a ldap server.
  19. Following parameters are needed in the configuration:
  20. ldap_uri The ldap url to the server like ldap://localhost
  21. ldap_base The baseDN of the ldap server
  22. ldap_reader_dn The DN of a ldap user with read access to get the user accounts
  23. ldap_secret The password of the ldap_reader_dn
  24. ldap_secret_file The path of the file containing the password of the ldap_reader_dn
  25. ldap_filter The search filter to find the user to authenticate by the username
  26. ldap_load_groups If the groups of the authenticated users need to be loaded
  27. Following parameters controls SSL connections:
  28. ldap_use_ssl If the connection
  29. ldap_ssl_verify_mode The certificate verification mode. NONE, OPTIONAL, default is REQUIRED
  30. ldap_ssl_ca_file
  31. """
  32. import ssl
  33. from radicale import auth, config
  34. from radicale.log import logger
  35. class Auth(auth.BaseAuth):
  36. _ldap_uri: str
  37. _ldap_base: str
  38. _ldap_reader_dn: str
  39. _ldap_secret: str
  40. _ldap_filter: str
  41. _ldap_load_groups: bool
  42. _ldap_version: int = 3
  43. _ldap_use_ssl: bool = False
  44. _ldap_ssl_verify_mode: int = ssl.CERT_REQUIRED
  45. _ldap_ssl_ca_file: str = ""
  46. def __init__(self, configuration: config.Configuration) -> None:
  47. super().__init__(configuration)
  48. try:
  49. import ldap3
  50. self.ldap3 = ldap3
  51. except ImportError:
  52. try:
  53. import ldap
  54. self._ldap_version = 2
  55. self.ldap = ldap
  56. except ImportError as e:
  57. raise RuntimeError("LDAP authentication requires the ldap3 module") from e
  58. self._ldap_uri = configuration.get("auth", "ldap_uri")
  59. self._ldap_base = configuration.get("auth", "ldap_base")
  60. self._ldap_reader_dn = configuration.get("auth", "ldap_reader_dn")
  61. self._ldap_load_groups = configuration.get("auth", "ldap_load_groups")
  62. self._ldap_secret = configuration.get("auth", "ldap_secret")
  63. self._ldap_filter = configuration.get("auth", "ldap_filter")
  64. ldap_secret_file_path = configuration.get("auth", "ldap_secret_file")
  65. if ldap_secret_file_path:
  66. with open(ldap_secret_file_path, 'r') as file:
  67. self._ldap_secret = file.read().rstrip('\n')
  68. if self._ldap_version == 3:
  69. self._ldap_use_ssl = configuration.get("auth", "ldap_use_ssl")
  70. if self._ldap_use_ssl:
  71. self._ldap_ssl_ca_file = configuration.get("auth", "ldap_ssl_ca_file")
  72. tmp = configuration.get("auth", "ldap_ssl_verify_mode")
  73. if tmp == "NONE":
  74. self._ldap_ssl_verify_mode = ssl.CERT_NONE
  75. elif tmp == "OPTIONAL":
  76. self._ldap_ssl_verify_mode = ssl.CERT_OPTIONAL
  77. logger.info("auth.ldap_uri : %r" % self._ldap_uri)
  78. logger.info("auth.ldap_base : %r" % self._ldap_base)
  79. logger.info("auth.ldap_reader_dn : %r" % self._ldap_reader_dn)
  80. logger.info("auth.ldap_load_groups : %s" % self._ldap_load_groups)
  81. logger.info("auth.ldap_filter : %r" % self._ldap_filter)
  82. if ldap_secret_file_path:
  83. logger.info("auth.ldap_secret_file_path: %r" % ldap_secret_file_path)
  84. if self._ldap_secret:
  85. logger.info("auth.ldap_secret : (from file)")
  86. else:
  87. logger.info("auth.ldap_secret_file_path: (not provided)")
  88. if self._ldap_secret:
  89. logger.info("auth.ldap_secret : (from config)")
  90. if self._ldap_reader_dn and not self._ldap_secret:
  91. logger.error("auth.ldap_secret : (not provided)")
  92. raise RuntimeError("LDAP authentication requires ldap_secret for reader_dn")
  93. logger.info("auth.ldap_use_ssl : %s" % self._ldap_use_ssl)
  94. if self._ldap_use_ssl is True:
  95. logger.info("auth.ldap_ssl_verify_mode : %s" % self._ldap_ssl_verify_mode)
  96. if self._ldap_ssl_ca_file:
  97. logger.info("auth.ldap_ssl_ca_file : %r" % self._ldap_ssl_ca_file)
  98. else:
  99. logger.info("auth.ldap_ssl_ca_file : (not provided)")
  100. def _login2(self, login: str, password: str) -> str:
  101. try:
  102. """Bind as reader dn"""
  103. logger.debug(f"_login2 {self._ldap_uri}, {self._ldap_reader_dn}")
  104. conn = self.ldap.initialize(self._ldap_uri)
  105. conn.protocol_version = 3
  106. conn.set_option(self.ldap.OPT_REFERRALS, 0)
  107. conn.simple_bind_s(self._ldap_reader_dn, self._ldap_secret)
  108. """Search for the dn of user to authenticate"""
  109. res = conn.search_s(self._ldap_base, self.ldap.SCOPE_SUBTREE, filterstr=self._ldap_filter.format(login), attrlist=['memberOf'])
  110. if len(res) == 0:
  111. """User could not be find"""
  112. return ""
  113. user_dn = res[0][0]
  114. logger.debug("LDAP Auth user: %s", user_dn)
  115. """Close ldap connection"""
  116. conn.unbind()
  117. except Exception as e:
  118. raise RuntimeError(f"Invalid ldap configuration:{e}")
  119. try:
  120. """Bind as user to authenticate"""
  121. conn = self.ldap.initialize(self._ldap_uri)
  122. conn.protocol_version = 3
  123. conn.set_option(self.ldap.OPT_REFERRALS, 0)
  124. conn.simple_bind_s(user_dn, password)
  125. tmp: list[str] = []
  126. if self._ldap_load_groups:
  127. tmp = []
  128. for t in res[0][1]['memberOf']:
  129. tmp.append(t.decode('utf-8').split(',')[0][3:])
  130. self._ldap_groups = set(tmp)
  131. logger.debug("LDAP Auth groups of user: %s", ",".join(self._ldap_groups))
  132. conn.unbind()
  133. return login
  134. except self.ldap.INVALID_CREDENTIALS:
  135. return ""
  136. def _login3(self, login: str, password: str) -> str:
  137. """Connect the server"""
  138. try:
  139. logger.debug(f"_login3 {self._ldap_uri}, {self._ldap_reader_dn}")
  140. if self._ldap_use_ssl:
  141. tls = self.ldap3.Tls(validate=self._ldap_ssl_verify_mode)
  142. if self._ldap_ssl_ca_file != "":
  143. tls = self.ldap3.Tls(
  144. validate=self._ldap_ssl_verify_mode,
  145. ca_certs_file=self._ldap_ssl_ca_file
  146. )
  147. server = self.ldap3.Server(self._ldap_uri, use_ssl=True, tls=tls)
  148. else:
  149. server = self.ldap3.Server(self._ldap_uri)
  150. conn = self.ldap3.Connection(server, self._ldap_reader_dn, password=self._ldap_secret)
  151. except self.ldap3.core.exceptions.LDAPSocketOpenError:
  152. raise RuntimeError("Unable to reach ldap server")
  153. except Exception as e:
  154. logger.debug(f"_login3 error 1 {e}")
  155. pass
  156. if not conn.bind():
  157. logger.debug("_login3 can not bind")
  158. raise RuntimeError("Unable to read from ldap server")
  159. logger.debug(f"_login3 bind as {self._ldap_reader_dn}")
  160. """Search the user dn"""
  161. conn.search(
  162. search_base=self._ldap_base,
  163. search_filter=self._ldap_filter.format(login),
  164. search_scope=self.ldap3.SUBTREE,
  165. attributes=['memberOf']
  166. )
  167. if len(conn.entries) == 0:
  168. logger.debug(f"_login3 user '{login}' can not be find")
  169. """User could not be find"""
  170. return ""
  171. user_entry = conn.response[0]
  172. conn.unbind()
  173. user_dn = user_entry['dn']
  174. logger.debug(f"_login3 found user_dn {user_dn}")
  175. try:
  176. """Try to bind as the user itself"""
  177. conn = self.ldap3.Connection(server, user_dn, password=password)
  178. if not conn.bind():
  179. logger.debug(f"_login3 user '{login}' can not be find")
  180. return ""
  181. if self._ldap_load_groups:
  182. tmp = []
  183. for g in user_entry['attributes']['memberOf']:
  184. tmp.append(g.split(',')[0][3:])
  185. self._ldap_groups = set(tmp)
  186. conn.unbind()
  187. logger.debug(f"_login3 {login} successfully authorized")
  188. return login
  189. except Exception as e:
  190. logger.debug(f"_login3 error 2 {e}")
  191. pass
  192. return ""
  193. def _login(self, login: str, password: str) -> str:
  194. """Validate credentials.
  195. In first step we make a connection to the ldap server with the ldap_reader_dn credential.
  196. In next step the DN of the user to authenticate will be searched.
  197. In the last step the authentication of the user will be proceeded.
  198. """
  199. if self._ldap_version == 2:
  200. return self._login2(login, password)
  201. return self._login3(login, password)