# This file is part of Radicale - CalDAV and CardDAV server # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter # Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2017-2022 Unrud # Copyright © 2024-2024 Peter Bieringer # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Radicale. If not, see . """ Authentication module. Authentication is based on usernames and passwords. If something more advanced is needed an external WSGI server or reverse proxy can be used (see ``remote_user`` or ``http_x_remote_user`` backend). Take a look at the class ``BaseAuth`` if you want to implement your own. """ import hashlib import time from typing import Sequence, Set, Tuple, Union, final from radicale import config, types, utils from radicale.log import logger INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user", "denyall", "htpasswd", "ldap", "dovecot") def load(configuration: "config.Configuration") -> "BaseAuth": """Load the authentication module chosen in configuration.""" if configuration.get("auth", "type") == "none": logger.warning("No user authentication is selected: '[auth] type=none' (insecure)") if configuration.get("auth", "type") == "denyall": logger.warning("All access is blocked by: '[auth] type=denyall'") return utils.load_plugin(INTERNAL_TYPES, "auth", "Auth", BaseAuth, configuration) class BaseAuth: _ldap_groups: Set[str] = set([]) _lc_username: bool _uc_username: bool _strip_domain: bool _type: str _cache_logins: bool _cache_successful: dict # login -> (digest, time_ns) _cache_successful_logins_expiry: int _cache_failed: dict # digest_failed -> (time_ns) _cache_failed_logins_expiry: int _cache_failed_logins_salt_ns: int # persistent over runtime def __init__(self, configuration: "config.Configuration") -> None: """Initialize BaseAuth. ``configuration`` see ``radicale.config`` module. The ``configuration`` must not change during the lifetime of this object, it is kept as an internal reference. """ self.configuration = configuration self._lc_username = configuration.get("auth", "lc_username") self._uc_username = configuration.get("auth", "uc_username") self._strip_domain = configuration.get("auth", "strip_domain") logger.info("auth.strip_domain: %s", self._strip_domain) logger.info("auth.lc_username: %s", self._lc_username) logger.info("auth.uc_username: %s", self._uc_username) if self._lc_username is True and self._uc_username is True: raise RuntimeError("auth.lc_username and auth.uc_username cannot be enabled together") # cache_successful_logins self._cache_logins = configuration.get("auth", "cache_logins") self._type = configuration.get("auth", "type") if (self._type in [ "dovecot", "ldap", "htpasswd" ]) or (self._cache_logins is False): logger.info("auth.cache_logins: %s", self._cache_logins) else: logger.info("auth.cache_logins: %s (but not required for type '%s' and disabled therefore)", self._cache_logins, self._type) self._cache_logins = False if self._cache_logins is True: self._cache_successful_logins_expiry = configuration.get("auth", "cache_successful_logins_expiry") if self._cache_successful_logins_expiry < 0: raise RuntimeError("self._cache_successful_logins_expiry cannot be < 0") self._cache_failed_logins_expiry = configuration.get("auth", "cache_failed_logins_expiry") if self._cache_failed_logins_expiry < 0: raise RuntimeError("self._cache_failed_logins_expiry cannot be < 0") logger.info("auth.cache_successful_logins_expiry: %s seconds", self._cache_successful_logins_expiry) logger.info("auth.cache_failed_logins_expiry: %s seconds", self._cache_failed_logins_expiry) # cache init self._cache_successful = dict() self._cache_failed = dict() self._cache_failed_logins_salt_ns = time.time_ns() def _cache_digest(self, login: str, password: str, salt: str) -> str: h = hashlib.sha3_512() h.update(salt.encode()) h.update(login.encode()) h.update(password.encode()) return str(h.digest()) def get_external_login(self, environ: types.WSGIEnviron) -> Union[ Tuple[()], Tuple[str, str]]: """Optionally provide the login and password externally. ``environ`` a dict with the WSGI environment If ``()`` is returned, Radicale handles HTTP authentication. Otherwise, returns a tuple ``(login, password)``. For anonymous users ``login`` must be ``""``. """ return () def _login(self, login: str, password: str) -> str: """Check credentials and map login to internal user ``login`` the login name ``password`` the password Returns the username or ``""`` for invalid credentials. """ raise NotImplementedError @final def login(self, login: str, password: str) -> str: if self._lc_username: login = login.lower() if self._uc_username: login = login.upper() if self._strip_domain: login = login.split('@')[0] if self._cache_logins is True: # time_ns is also used as salt result = "" digest = "" time_ns = time.time_ns() digest_failed = login + ":" + self._cache_digest(login, password, str(self._cache_failed_logins_salt_ns)) if self._cache_failed.get(digest_failed): # login+password found in cache "failed" time_ns_cache = self._cache_failed[digest_failed] age_failed = int((time_ns - time_ns_cache) / 1000 / 1000 / 1000) if age_failed > self._cache_failed_logins_expiry: logger.debug("Login failed cache entry for user+password found but expired: '%s' (age: %d > %d sec)", login, age_failed, self._cache_failed_logins_expiry) # delete expired failed from cache del self._cache_failed[digest_failed] else: # shortcut return logger.debug("Login failed cache entry for user+password found: '%s' (age: %d sec)", login, age_failed) return "" if self._cache_successful.get(login): # login found in cache "successful" (digest_cache, time_ns_cache) = self._cache_successful[login] digest = self._cache_digest(login, password, str(time_ns_cache)) if digest == digest_cache: age_success = int((time_ns - time_ns_cache) / 1000 / 1000 / 1000) if age_success > self._cache_successful_logins_expiry: logger.debug("Login successful cache entry for user+password found but expired: '%s' (age: %d > %d sec)", login, age_success, self._cache_successful_logins_expiry) # delete expired success from cache del self._cache_successful[login] digest = "" else: logger.debug("Login successful cache entry for user+password found: '%s' (age: %d sec)", login, age_success) result = login else: logger.debug("Login successful cache entry for user+password not matching: '%s'", login) else: # login not found in cache, caculate always to avoid timing attacks digest = self._cache_digest(login, password, str(time_ns)) if result == "": # verify login+password via configured backend logger.debug("Login verification for user+password via backend: '%s'", login) result = self._login(login, password) if result != "": logger.debug("Login successful for user+password via backend: '%s'", login) if digest == "": # successful login, but expired, digest must be recalculated digest = self._cache_digest(login, password, str(time_ns)) # store successful login in cache self._cache_successful[login] = (digest, time_ns) logger.debug("Login successful cache for user set: '%s'", login) if self._cache_failed.get(digest_failed): logger.debug("Login failed cache for user cleared: '%s'", login) del self._cache_failed[digest_failed] else: logger.debug("Login failed for user+password via backend: '%s'", login) self._cache_failed[digest_failed] = time_ns logger.debug("Login failed cache for user set: '%s'", login) return result else: return self._login(login, password)