Răsfoiți Sursa

[auth] htpasswd: module 'bcrypt' is no longer mandatory in case digest method not used in file

Peter Bieringer 1 an în urmă
părinte
comite
5357e692d9
1 a modificat fișierele cu 35 adăugiri și 9 ștergeri
  1. 35 9
      radicale/auth/htpasswd.py

+ 35 - 9
radicale/auth/htpasswd.py

@@ -70,6 +70,8 @@ class Auth(auth.BaseAuth):
     _htpasswd_ok: bool
     _htpasswd_not_ok_seconds: int
     _htpasswd_not_ok_reminder_seconds: int
+    _htpasswd_bcrypt_use: int
+    _has_bcrypt: bool
     _lock: threading.Lock
 
     def __init__(self, configuration: config.Configuration) -> None:
@@ -79,9 +81,10 @@ class Auth(auth.BaseAuth):
         encryption: str = configuration.get("auth", "htpasswd_encryption")
         logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s'", encryption)
 
+        self._has_bcrypt = False
         self._htpasswd_ok = False
         self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
-        self._htpasswd_read = self._read_htpasswd(True)
+        (self._htpasswd_ok, self._htpasswd_bcrypt_use) = self._read_htpasswd(True)
         self._lock = threading.Lock()
 
         if encryption == "plain":
@@ -96,14 +99,24 @@ class Auth(auth.BaseAuth):
             try:
                 import bcrypt
             except ImportError as e:
-                raise RuntimeError(
-                    "The htpasswd encryption method 'bcrypt' or 'autodetect' requires "
-                    "the bcrypt module.") from e
+                if (encryption == "autodetect") and (self._htpasswd_bcrypt_use == 0):
+                    logger.warning("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s' which can require bycrypt module, but currently no entries found", encryption)
+                else:
+                    raise RuntimeError(
+                        "The htpasswd encryption method 'bcrypt' or 'autodetect' requires "
+                        "the bcrypt module (entries found: %d)." % self._htpasswd_bcrypt_use) from e
+            else:
+                if encryption == "autodetect":
+                    if self._htpasswd_bcrypt_use == 0:
+                        logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s' and bycrypt module found, but currently not required", encryption)
+                    else:
+                        logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s' and bycrypt module found (entries found: %d)", encryption, self._htpasswd_bcrypt_use)
             if encryption == "bcrypt":
                 self._verify = functools.partial(self._bcrypt, bcrypt)
             else:
                 self._verify = self._autodetect
                 self._verify_bcrypt = functools.partial(self._bcrypt, bcrypt)
+            self._has_bcrypt = True
         else:
             raise RuntimeError("The htpasswd encryption method %r is not "
                                "supported." % encryption)
@@ -141,7 +154,7 @@ class Auth(auth.BaseAuth):
             # assumed plaintext
             return self._plain(hash_value, password)
 
-    def _read_htpasswd(self, init: bool) -> bool:
+    def _read_htpasswd(self, init: bool) -> (bool, int):
         """Read htpasswd file
 
         init == True: stop on error
@@ -149,6 +162,7 @@ class Auth(auth.BaseAuth):
 
         """
         htpasswd_ok = True
+        bcrypt_use = 0
         if init is True:
             info = "Read"
         else:
@@ -166,12 +180,14 @@ class Auth(auth.BaseAuth):
                     if line.lstrip() and not line.lstrip().startswith("#"):
                         try:
                             login, digest = line.split( ":", maxsplit=1)
+                            skip = False
                             if login == "" or digest == "":
                                 if init is True:
                                     raise ValueError("htpasswd file contains problematic line not matching <login>:<digest> in line: %d" % line_num)
                                 else:
                                     logger.warning("htpasswd file contains problematic line not matching <login>:<digest> in line: %d (ignored)", line_num)
                                     htpasswd_ok = False
+                                    skip = True
                             else:
                                 if htpasswd.get(login):
                                     duplicates += 1
@@ -180,9 +196,19 @@ class Auth(auth.BaseAuth):
                                     else:
                                         logger.warning("htpasswd file contains duplicate login: '%s' (line: %d / ignored)", login, line_num)
                                         htpasswd_ok = False
+                                        skip = True
                                 else:
-                                    htpasswd[login] = digest
-                                    entries += 1
+                                    if digest.startswith("$2y$", 0, 4) and len(digest) == 60:
+                                        if init is True:
+                                            bcrypt_use += 1
+                                        else:
+                                            if self._has_bcrypt is False:
+                                                logger.warning("htpasswd file contains bcrypt digest login: '%s' (line: %d / ignored because module is not loaded)", login, line_num)
+                                                skip = True
+                                                htpasswd_ok = False
+                            if skip is False:
+                                htpasswd[login] = digest
+                                entries += 1
                         except ValueError as e:
                             if init is True:
                                 raise RuntimeError("Invalid htpasswd file %r: %s" % (self._filename, e)) from e
@@ -201,7 +227,7 @@ class Auth(auth.BaseAuth):
             self._htpasswd_not_ok_time = 0
         else:
             self._htpasswd_not_ok_time = time.time()
-        return htpasswd_ok
+        return (htpasswd_ok, bcrypt_use)
 
     def _login(self, login: str, password: str) -> str:
         """Validate credentials.
@@ -219,7 +245,7 @@ class Auth(auth.BaseAuth):
         htpasswd_time_ns = os.stat(self._filename).st_mtime_ns
         if (htpasswd_size != self._htpasswd_size) or (htpasswd_time_ns != self._htpasswd_time_ns):
             with self._lock:
-                self._htpasswd_ok = self._read_htpasswd(False)
+                (self._htpasswd_ok, self._htpasswd_bcrypt_use) = self._read_htpasswd(False)
         else:
             # log reminder of problemantic file every interval
             if (self._htpasswd_ok is False) and (self._htpasswd_not_ok_time > 0):