| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- # This file is part of Radicale - CalDAV and CardDAV server
- # Copyright © 2008 Nicolas Kandel
- # Copyright © 2008 Pascal Halter
- # Copyright © 2008-2017 Guillaume Ayoub
- # Copyright © 2017-2022 Unrud <unrud@outlook.com>
- # Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
- #
- # This library is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
- """
- Authentication module.
- Authentication is based on usernames and passwords. If something more
- advanced is needed an external WSGI server or reverse proxy can be used
- (see ``remote_user`` or ``http_x_remote_user`` backend).
- Take a look at the class ``BaseAuth`` if you want to implement your own.
- """
- import hashlib
- import time
- from typing import Sequence, Set, Tuple, Union, final
- from radicale import config, types, utils
- from radicale.log import logger
- INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user",
- "denyall",
- "htpasswd",
- "ldap",
- "dovecot")
- def load(configuration: "config.Configuration") -> "BaseAuth":
- """Load the authentication module chosen in configuration."""
- if configuration.get("auth", "type") == "none":
- logger.warning("No user authentication is selected: '[auth] type=none' (insecure)")
- if configuration.get("auth", "type") == "denyall":
- logger.warning("All access is blocked by: '[auth] type=denyall'")
- return utils.load_plugin(INTERNAL_TYPES, "auth", "Auth", BaseAuth,
- configuration)
- class BaseAuth:
- _ldap_groups: Set[str] = set([])
- _lc_username: bool
- _uc_username: bool
- _strip_domain: bool
- _type: str
- _cache_logins: bool
- _cache_successful: dict # login -> (digest, time_ns)
- _cache_successful_logins_expiry: int
- _cache_failed: dict # digest_failed -> (time_ns)
- _cache_failed_logins_expiry: int
- _cache_failed_logins_salt_ns: int # persistent over runtime
- def __init__(self, configuration: "config.Configuration") -> None:
- """Initialize BaseAuth.
- ``configuration`` see ``radicale.config`` module.
- The ``configuration`` must not change during the lifetime of
- this object, it is kept as an internal reference.
- """
- self.configuration = configuration
- self._lc_username = configuration.get("auth", "lc_username")
- self._uc_username = configuration.get("auth", "uc_username")
- self._strip_domain = configuration.get("auth", "strip_domain")
- logger.info("auth.strip_domain: %s", self._strip_domain)
- logger.info("auth.lc_username: %s", self._lc_username)
- logger.info("auth.uc_username: %s", self._uc_username)
- if self._lc_username is True and self._uc_username is True:
- raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together")
- # cache_successful_logins
- self._cache_logins = configuration.get("auth", "cache_logins")
- self._type = configuration.get("auth", "type")
- if (self._type in [ "dovecot", "ldap", "htpasswd" ]) or (self._cache_logins is False):
- logger.info("auth.cache_logins: %s", self._cache_logins)
- else:
- logger.info("auth.cache_logins: %s (but not required for type '%s' and disabled therefore)", self._cache_logins, self._type)
- self._cache_logins = False
- if self._cache_logins is True:
- self._cache_successful_logins_expiry = configuration.get("auth", "cache_successful_logins_expiry")
- if self._cache_successful_logins_expiry < 0:
- raise RuntimeError("self._cache_successful_logins_expiry cannot be < 0")
- self._cache_failed_logins_expiry = configuration.get("auth", "cache_failed_logins_expiry")
- if self._cache_failed_logins_expiry < 0:
- raise RuntimeError("self._cache_failed_logins_expiry cannot be < 0")
- logger.info("auth.cache_successful_logins_expiry: %s seconds", self._cache_successful_logins_expiry)
- logger.info("auth.cache_failed_logins_expiry: %s seconds", self._cache_failed_logins_expiry)
- # cache init
- self._cache_successful = dict()
- self._cache_failed = dict()
- self._cache_failed_logins_salt_ns = time.time_ns()
- def _cache_digest(self, login: str, password: str, salt: str) -> str:
- h = hashlib.sha3_512()
- h.update(salt.encode())
- h.update(login.encode())
- h.update(password.encode())
- return str(h.digest())
- def get_external_login(self, environ: types.WSGIEnviron) -> Union[
- Tuple[()], Tuple[str, str]]:
- """Optionally provide the login and password externally.
- ``environ`` a dict with the WSGI environment
- If ``()`` is returned, Radicale handles HTTP authentication.
- Otherwise, returns a tuple ``(login, password)``. For anonymous users
- ``login`` must be ``""``.
- """
- return ()
- def _login(self, login: str, password: str) -> str:
- """Check credentials and map login to internal user
- ``login`` the login name
- ``password`` the password
- Returns the username or ``""`` for invalid credentials.
- """
- raise NotImplementedError
- @final
- def login(self, login: str, password: str) -> str:
- if self._lc_username:
- login = login.lower()
- if self._uc_username:
- login = login.upper()
- if self._strip_domain:
- login = login.split('@')[0]
- if self._cache_logins is True:
- # time_ns is also used as salt
- result = ""
- digest = ""
- time_ns = time.time_ns()
- digest_failed = login + ":" + self._cache_digest(login, password, str(self._cache_failed_logins_salt_ns))
- if self._cache_failed.get(digest_failed):
- # login+password found in cache "failed"
- time_ns_cache = self._cache_failed[digest_failed]
- age_failed = int((time_ns - time_ns_cache) / 1000 / 1000 / 1000)
- if age_failed > self._cache_failed_logins_expiry:
- logger.debug("Login failed cache entry for user+password found but expired: '%s' (age: %d > %d sec)", login, age_failed, self._cache_failed_logins_expiry)
- # delete expired failed from cache
- del self._cache_failed[digest_failed]
- else:
- # shortcut return
- logger.debug("Login failed cache entry for user+password found: '%s' (age: %d sec)", login, age_failed)
- return ""
- if self._cache_successful.get(login):
- # login found in cache "successful"
- (digest_cache, time_ns_cache) = self._cache_successful[login]
- digest = self._cache_digest(login, password, str(time_ns_cache))
- if digest == digest_cache:
- age_success = int((time_ns - time_ns_cache) / 1000 / 1000 / 1000)
- if age_success > self._cache_successful_logins_expiry:
- logger.debug("Login successful cache entry for user+password found but expired: '%s' (age: %d > %d sec)", login, age_success, self._cache_successful_logins_expiry)
- # delete expired success from cache
- del self._cache_successful[login]
- digest = ""
- else:
- logger.debug("Login successful cache entry for user+password found: '%s' (age: %d sec)", login, age_success)
- result = login
- else:
- logger.debug("Login successful cache entry for user+password not matching: '%s'", login)
- else:
- # login not found in cache, caculate always to avoid timing attacks
- digest = self._cache_digest(login, password, str(time_ns))
- if result == "":
- # verify login+password via configured backend
- logger.debug("Login verification for user+password via backend: '%s'", login)
- result = self._login(login, password)
- if result != "":
- logger.debug("Login successful for user+password via backend: '%s'", login)
- if digest == "":
- # successful login, but expired, digest must be recalculated
- digest = self._cache_digest(login, password, str(time_ns))
- # store successful login in cache
- self._cache_successful[login] = (digest, time_ns)
- logger.debug("Login successful cache for user set: '%s'", login)
- if self._cache_failed.get(digest_failed):
- logger.debug("Login failed cache for user cleared: '%s'", login)
- del self._cache_failed[digest_failed]
- else:
- logger.debug("Login failed for user+password via backend: '%s'", login)
- self._cache_failed[digest_failed] = time_ns
- logger.debug("Login failed cache for user set: '%s'", login)
- return result
- else:
- return self._login(login, password)
|