utils.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2018 Unrud <unrud@outlook.com>
  5. # Copyright © 2024-2026 Peter Bieringer <pb@bieringer.de>
  6. #
  7. # This library is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  19. import datetime
  20. import os
  21. import ssl
  22. import sys
  23. import textwrap
  24. from hashlib import sha256
  25. from importlib import import_module, metadata
  26. from string import ascii_letters, digits, punctuation
  27. from typing import Callable, Sequence, Tuple, Type, TypeVar, Union
  28. from packaging.version import Version
  29. from radicale import config
  30. from radicale.log import logger
  31. if sys.platform != "win32":
  32. import grp
  33. import pwd
  34. _T_co = TypeVar("_T_co", covariant=True)
  35. RADICALE_MODULES: Sequence[str] = ("radicale", "vobject", "passlib", "defusedxml",
  36. "bcrypt",
  37. "argon2-cffi",
  38. "pika",
  39. "ldap",
  40. "ldap3",
  41. "pam")
  42. # IPv4 (host, port) and IPv6 (host, port, flowinfo, scopeid)
  43. ADDRESS_TYPE = Union[Tuple[Union[str, bytes, bytearray], int],
  44. Tuple[str, int, int, int]]
  45. # Max/Min YEAR in datetime in unixtime
  46. DATETIME_MAX_UNIXTIME: int = (datetime.MAXYEAR - 1970) * 365 * 24 * 60 * 60
  47. DATETIME_MIN_UNIXTIME: int = (datetime.MINYEAR - 1970) * 365 * 24 * 60 * 60
  48. # Number units
  49. UNIT_g: int = (1000 * 1000 * 1000)
  50. UNIT_m: int = (1000 * 1000)
  51. UNIT_k: int = (1000)
  52. UNIT_G: int = (1024 * 1024 * 1024)
  53. UNIT_M: int = (1024 * 1024)
  54. UNIT_K: int = (1024)
  55. def load_plugin(internal_types: Sequence[str], module_name: str,
  56. class_name: str, base_class: Type[_T_co],
  57. configuration: "config.Configuration") -> _T_co:
  58. type_: Union[str, Callable] = configuration.get(module_name, "type")
  59. if callable(type_):
  60. logger.info("%s type is %r", module_name, type_)
  61. return type_(configuration)
  62. if type_ in internal_types:
  63. module = "radicale.%s.%s" % (module_name, type_)
  64. else:
  65. module = type_
  66. try:
  67. class_ = getattr(import_module(module), class_name)
  68. except Exception as e:
  69. raise RuntimeError("Failed to load %s module %r: %s" %
  70. (module_name, module, e)) from e
  71. logger.info("%s type is %r", module_name, module)
  72. return class_(configuration)
  73. def package_version(name):
  74. if name == "passlib":
  75. # passlib(libpass) requires special handling as module name is unchanged, but metadata has new name
  76. import passlib
  77. return passlib.__version__
  78. return metadata.version(name)
  79. def vobject_supports_vcard4() -> bool:
  80. """Check if vobject supports vCard 4.0 (requires version >= 1.0.0)."""
  81. try:
  82. version = package_version("vobject")
  83. parts = version.split(".")
  84. major = int(parts[0])
  85. return major >= 1
  86. except Exception:
  87. return False
  88. def passlib_libpass_supports_bcrypt() -> Tuple[bool, str]:
  89. """Check if passlib(libpass) version supports bcrypt version."""
  90. info = ""
  91. try:
  92. version_bcrypt = package_version("bcrypt")
  93. version_bcrypt_check = "5.0.0"
  94. version_passlib = package_version("passlib")
  95. version_passlib_check = "1.9.3"
  96. if Version(version_bcrypt) >= Version(version_bcrypt_check):
  97. # bcrypt >= 5.0.0 has issues with passlib(libpass) < 1.9.3
  98. if Version(version_passlib) < Version(version_passlib_check):
  99. info = "bcrypt module version %r >= %r and passlib(libpass) module version %r < %r found => incompatible, downgrade bcrypt or upgrade passlib(libpass)" % (version_bcrypt, version_bcrypt_check, version_passlib, version_passlib_check)
  100. return (False, info)
  101. else:
  102. info = "bcrypt module version %r >= %r and passlib(libpass) module version %r >= %r found => ok" % (version_bcrypt, version_bcrypt_check, version_passlib, version_passlib_check)
  103. return (True, info)
  104. else:
  105. info = "bcrypt module version %r < %r and passlib(libpass) module version %r found => ok" % (version_bcrypt, version_bcrypt_check, version_passlib)
  106. return (True, info)
  107. except Exception:
  108. info = "bcrypt module version or passlib(libpass) module version %r not found => problem"
  109. return (False, info)
  110. def packages_version():
  111. versions = []
  112. versions.append("python=%s.%s.%s" % (sys.version_info[0], sys.version_info[1], sys.version_info[2]))
  113. for pkg in RADICALE_MODULES:
  114. try:
  115. versions.append("%s=%s" % (pkg, package_version(pkg)))
  116. except Exception:
  117. try:
  118. versions.append("%s=%s" % (pkg, package_version("python-" + pkg)))
  119. except Exception:
  120. versions.append("%s=%s" % (pkg, "n/a"))
  121. return " ".join(versions)
  122. def format_address(address: ADDRESS_TYPE) -> str:
  123. host, port, *_ = address
  124. if not isinstance(host, str):
  125. raise NotImplementedError("Unsupported address format: %r" %
  126. (address,))
  127. if host.find(":") == -1:
  128. return "%s:%d" % (host, port)
  129. else:
  130. return "[%s]:%d" % (host, port)
  131. def ssl_context_options_by_protocol(protocol: str, ssl_context_options):
  132. logger.debug("SSL protocol string: '%s' and current SSL context options: '0x%x'", protocol, ssl_context_options)
  133. # disable any protocol by default
  134. logger.debug("SSL context options, disable ALL by default")
  135. ssl_context_options |= ssl.OP_NO_SSLv2
  136. ssl_context_options |= ssl.OP_NO_SSLv3
  137. ssl_context_options |= ssl.OP_NO_TLSv1
  138. ssl_context_options |= ssl.OP_NO_TLSv1_1
  139. ssl_context_options |= ssl.OP_NO_TLSv1_2
  140. ssl_context_options |= ssl.OP_NO_TLSv1_3
  141. logger.debug("SSL cleared SSL context options: '0x%x'", ssl_context_options)
  142. for entry in protocol.split():
  143. entry = entry.strip('+') # remove trailing '+'
  144. if entry == "ALL":
  145. logger.debug("SSL context options, enable ALL (some maybe not supported by underlying OpenSSL, SSLv2 not enabled at all)")
  146. ssl_context_options &= ~ssl.OP_NO_SSLv3
  147. ssl_context_options &= ~ssl.OP_NO_TLSv1
  148. ssl_context_options &= ~ssl.OP_NO_TLSv1_1
  149. ssl_context_options &= ~ssl.OP_NO_TLSv1_2
  150. ssl_context_options &= ~ssl.OP_NO_TLSv1_3
  151. elif entry == "SSLv2":
  152. logger.warning("SSL context options, ignore SSLv2 (totally insecure)")
  153. elif entry == "SSLv3":
  154. ssl_context_options &= ~ssl.OP_NO_SSLv3
  155. logger.debug("SSL context options, enable SSLv3 (maybe not supported by underlying OpenSSL)")
  156. elif entry == "TLSv1":
  157. ssl_context_options &= ~ssl.OP_NO_TLSv1
  158. logger.debug("SSL context options, enable TLSv1 (maybe not supported by underlying OpenSSL)")
  159. elif entry == "TLSv1.1":
  160. logger.debug("SSL context options, enable TLSv1.1 (maybe not supported by underlying OpenSSL)")
  161. ssl_context_options &= ~ssl.OP_NO_TLSv1_1
  162. elif entry == "TLSv1.2":
  163. logger.debug("SSL context options, enable TLSv1.2")
  164. ssl_context_options &= ~ssl.OP_NO_TLSv1_2
  165. elif entry == "TLSv1.3":
  166. logger.debug("SSL context options, enable TLSv1.3")
  167. ssl_context_options &= ~ssl.OP_NO_TLSv1_3
  168. elif entry == "-ALL":
  169. logger.debug("SSL context options, disable ALL")
  170. ssl_context_options |= ssl.OP_NO_SSLv2
  171. ssl_context_options |= ssl.OP_NO_SSLv3
  172. ssl_context_options |= ssl.OP_NO_TLSv1
  173. ssl_context_options |= ssl.OP_NO_TLSv1_1
  174. ssl_context_options |= ssl.OP_NO_TLSv1_2
  175. ssl_context_options |= ssl.OP_NO_TLSv1_3
  176. elif entry == "-SSLv2":
  177. ssl_context_options |= ssl.OP_NO_SSLv2
  178. logger.debug("SSL context options, disable SSLv2")
  179. elif entry == "-SSLv3":
  180. ssl_context_options |= ssl.OP_NO_SSLv3
  181. logger.debug("SSL context options, disable SSLv3")
  182. elif entry == "-TLSv1":
  183. logger.debug("SSL context options, disable TLSv1")
  184. ssl_context_options |= ssl.OP_NO_TLSv1
  185. elif entry == "-TLSv1.1":
  186. logger.debug("SSL context options, disable TLSv1.1")
  187. ssl_context_options |= ssl.OP_NO_TLSv1_1
  188. elif entry == "-TLSv1.2":
  189. logger.debug("SSL context options, disable TLSv1.2")
  190. ssl_context_options |= ssl.OP_NO_TLSv1_2
  191. elif entry == "-TLSv1.3":
  192. logger.debug("SSL context options, disable TLSv1.3")
  193. ssl_context_options |= ssl.OP_NO_TLSv1_3
  194. else:
  195. raise RuntimeError("SSL protocol config contains unsupported entry '%s'" % (entry))
  196. logger.debug("SSL resulting context options: '0x%x'", ssl_context_options)
  197. return ssl_context_options
  198. def ssl_context_minimum_version_by_options(ssl_context_options):
  199. logger.debug("SSL calculate minimum version by context options: '0x%x'", ssl_context_options)
  200. ssl_context_minimum_version = ssl.TLSVersion.SSLv3 # default
  201. if ((ssl_context_options & ssl.OP_NO_SSLv3) and (ssl_context_minimum_version == ssl.TLSVersion.SSLv3)):
  202. ssl_context_minimum_version = ssl.TLSVersion.TLSv1
  203. if ((ssl_context_options & ssl.OP_NO_TLSv1) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1)):
  204. ssl_context_minimum_version = ssl.TLSVersion.TLSv1_1
  205. if ((ssl_context_options & ssl.OP_NO_TLSv1_1) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_1)):
  206. ssl_context_minimum_version = ssl.TLSVersion.TLSv1_2
  207. if ((ssl_context_options & ssl.OP_NO_TLSv1_2) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_2)):
  208. ssl_context_minimum_version = ssl.TLSVersion.TLSv1_3
  209. if ((ssl_context_options & ssl.OP_NO_TLSv1_3) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_3)):
  210. ssl_context_minimum_version = 0 # all disabled
  211. logger.debug("SSL context options: '0x%x' results in minimum version: %s", ssl_context_options, ssl_context_minimum_version)
  212. return ssl_context_minimum_version
  213. def ssl_context_maximum_version_by_options(ssl_context_options):
  214. logger.debug("SSL calculate maximum version by context options: '0x%x'", ssl_context_options)
  215. ssl_context_maximum_version = ssl.TLSVersion.TLSv1_3 # default
  216. if ((ssl_context_options & ssl.OP_NO_TLSv1_3) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_3)):
  217. ssl_context_maximum_version = ssl.TLSVersion.TLSv1_2
  218. if ((ssl_context_options & ssl.OP_NO_TLSv1_2) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_2)):
  219. ssl_context_maximum_version = ssl.TLSVersion.TLSv1_1
  220. if ((ssl_context_options & ssl.OP_NO_TLSv1_1) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_1)):
  221. ssl_context_maximum_version = ssl.TLSVersion.TLSv1
  222. if ((ssl_context_options & ssl.OP_NO_TLSv1) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1)):
  223. ssl_context_maximum_version = ssl.TLSVersion.SSLv3
  224. if ((ssl_context_options & ssl.OP_NO_SSLv3) and (ssl_context_maximum_version == ssl.TLSVersion.SSLv3)):
  225. ssl_context_maximum_version = 0
  226. logger.debug("SSL context options: '0x%x' results in maximum version: %s", ssl_context_options, ssl_context_maximum_version)
  227. return ssl_context_maximum_version
  228. def ssl_get_protocols(context):
  229. protocols = []
  230. if not (context.options & ssl.OP_NO_SSLv3):
  231. if (context.minimum_version < ssl.TLSVersion.TLSv1):
  232. protocols.append("SSLv3")
  233. if not (context.options & ssl.OP_NO_TLSv1):
  234. if (context.minimum_version < ssl.TLSVersion.TLSv1_1) and (context.maximum_version >= ssl.TLSVersion.TLSv1):
  235. protocols.append("TLSv1")
  236. if not (context.options & ssl.OP_NO_TLSv1_1):
  237. if (context.minimum_version < ssl.TLSVersion.TLSv1_2) and (context.maximum_version >= ssl.TLSVersion.TLSv1_1):
  238. protocols.append("TLSv1.1")
  239. if not (context.options & ssl.OP_NO_TLSv1_2):
  240. if (context.minimum_version <= ssl.TLSVersion.TLSv1_2) and (context.maximum_version >= ssl.TLSVersion.TLSv1_2):
  241. protocols.append("TLSv1.2")
  242. if not (context.options & ssl.OP_NO_TLSv1_3):
  243. if (context.minimum_version <= ssl.TLSVersion.TLSv1_3) and (context.maximum_version >= ssl.TLSVersion.TLSv1_3):
  244. protocols.append("TLSv1.3")
  245. return protocols
  246. def unknown_if_empty(value):
  247. if value == "":
  248. return "UNKNOWN"
  249. else:
  250. return value
  251. def user_groups_as_string():
  252. if sys.platform != "win32":
  253. euid = os.geteuid()
  254. try:
  255. username = pwd.getpwuid(euid)[0]
  256. user = "%s(%d)" % (unknown_if_empty(username), euid)
  257. except Exception:
  258. # name of user not found
  259. user = "UNKNOWN(%d)" % euid
  260. egid = os.getegid()
  261. groups = []
  262. try:
  263. gids = os.getgrouplist(username, egid)
  264. for gid in gids:
  265. try:
  266. gi = grp.getgrgid(gid)
  267. groups.append("%s(%d)" % (unknown_if_empty(gi.gr_name), gid))
  268. except Exception:
  269. groups.append("UNKNOWN(%d)" % gid)
  270. except Exception:
  271. try:
  272. groups.append("%s(%d)" % (grp.getgrnam(egid)[0], egid))
  273. except Exception:
  274. # workaround to get groupid by name
  275. groups_all = grp.getgrall()
  276. found = False
  277. for entry in groups_all:
  278. if entry[2] == egid:
  279. groups.append("%s(%d)" % (unknown_if_empty(entry[0]), egid))
  280. found = True
  281. break
  282. if not found:
  283. groups.append("UNKNOWN(%d)" % egid)
  284. s = "user=%s groups=%s" % (user, ','.join(groups))
  285. else:
  286. username = os.getlogin()
  287. s = "user=%s" % (username)
  288. return s
  289. def format_ut(unixtime: int) -> str:
  290. if sys.platform == "win32":
  291. # TODO check how to support this better
  292. return str(unixtime)
  293. if unixtime <= DATETIME_MIN_UNIXTIME:
  294. r = str(unixtime) + "(<=MIN:" + str(DATETIME_MIN_UNIXTIME) + ")"
  295. elif unixtime >= DATETIME_MAX_UNIXTIME:
  296. r = str(unixtime) + "(>=MAX:" + str(DATETIME_MAX_UNIXTIME) + ")"
  297. else:
  298. if sys.version_info < (3, 11):
  299. dt = datetime.datetime.utcfromtimestamp(unixtime)
  300. else:
  301. dt = datetime.datetime.fromtimestamp(unixtime, datetime.UTC)
  302. r = str(unixtime) + "(" + dt.strftime('%Y-%m-%dT%H:%M:%SZ') + ")"
  303. return r
  304. def format_unit(value: float, binary: bool = False) -> str:
  305. if binary:
  306. if value > UNIT_G:
  307. value = value / UNIT_G
  308. unit = "G"
  309. elif value > UNIT_M:
  310. value = value / UNIT_M
  311. unit = "M"
  312. elif value > UNIT_K:
  313. value = value / UNIT_K
  314. unit = "K"
  315. else:
  316. unit = ""
  317. else:
  318. if value > UNIT_g:
  319. value = value / UNIT_g
  320. unit = "g"
  321. elif value > UNIT_m:
  322. value = value / UNIT_m
  323. unit = "m"
  324. elif value > UNIT_k:
  325. value = value / UNIT_k
  326. unit = "k"
  327. else:
  328. unit = ""
  329. return ("%.1f %s" % (value, unit))
  330. def limit_str(content: str, limit: int) -> str:
  331. length = len(content)
  332. if limit > 0 and length >= limit:
  333. return content[:limit] + ("...(shortened because original length %d > limit %d)" % (length, limit))
  334. else:
  335. return content
  336. def textwrap_str(content: str, limit: int = 2000) -> str:
  337. # TODO: add support for config option and prefix
  338. return textwrap.indent(limit_str(content, limit), " ", lambda line: True)
  339. def dataToHex(data, count):
  340. result = ''
  341. for item in range(count):
  342. if ((item > 0) and ((item % 8) == 0)):
  343. result += ' '
  344. if (item < len(data)):
  345. result += '%02x' % data[item] + ' '
  346. else:
  347. result += ' '
  348. return result
  349. def dataToAscii(data, count):
  350. result = ''
  351. for item in range(count):
  352. if (item < len(data)):
  353. char = chr(data[item])
  354. if char in ascii_letters or \
  355. char in digits or \
  356. char in punctuation or \
  357. char == ' ':
  358. result += char
  359. else:
  360. result += '.'
  361. return result
  362. def dataToSpecial(data, count):
  363. result = ''
  364. for item in range(count):
  365. if (item < len(data)):
  366. char = chr(data[item])
  367. if char == '\r':
  368. result += 'C'
  369. elif char == '\n':
  370. result += 'L'
  371. elif (ord(char) & 0xf8) == 0xf0: # assuming UTF-8
  372. result += '4'
  373. elif (ord(char) & 0xf0) == 0xf0: # assuming UTF-8
  374. result += '3'
  375. elif (ord(char) & 0xe0) == 0xe0: # assuming UTF-8
  376. result += '2'
  377. else:
  378. result += '.'
  379. return result
  380. def hexdump_str(content: str, limit: int = 2000) -> str:
  381. result = "Hexdump of string: index <bytes> | <ASCII> | <CTRL: C=CR L=LF 2/3/4=UTF-8-length> |\n"
  382. index = 0
  383. size = 16
  384. bytestring = content.encode("utf-8") # assuming UTF-8
  385. length = len(bytestring)
  386. while (index < length) and (index < limit):
  387. data = bytestring[index:index+size]
  388. hex = dataToHex(data, size)
  389. ascii = dataToAscii(data, size)
  390. special = dataToSpecial(data, size)
  391. result += '%08x ' % index
  392. result += hex
  393. result += '|'
  394. result += '%-16s' % ascii
  395. result += '|'
  396. result += '%-16s' % special
  397. result += '|'
  398. result += '\n'
  399. index += size
  400. return result
  401. def hexdump_line(line: str, limit: int = 200) -> str:
  402. result = ""
  403. length_str = len(line)
  404. bytestring = line.encode("utf-8") # assuming UTF-8
  405. length = len(bytestring)
  406. size = length
  407. if (size > limit):
  408. size = limit
  409. hex = dataToHex(bytestring, size)
  410. ascii = dataToAscii(bytestring, size)
  411. special = dataToSpecial(bytestring, size)
  412. result += '%3d/%3d' % (length_str, length)
  413. result += ': '
  414. result += hex
  415. result += '|'
  416. result += ascii
  417. result += '|'
  418. result += special
  419. result += '|'
  420. result += '\n'
  421. return result
  422. def hexdump_lines(lines: str, limit: int = 200) -> str:
  423. result = "Hexdump of lines: nr chars/bytes: <bytes> | <ASCII> | <CTRL: C=CR L=LF 2/3/4=UTF-8-length> |\n"
  424. counter = 0
  425. for line in lines.splitlines(True):
  426. result += '% 4d ' % counter
  427. result += hexdump_line(line)
  428. counter += 1
  429. return result
  430. def sha256_str(content: str) -> str:
  431. _hash = sha256()
  432. _hash.update(content.encode("utf-8")) # assuming UTF-8
  433. return _hash.hexdigest()
  434. def sha256_bytes(content: bytes) -> str:
  435. _hash = sha256()
  436. _hash.update(content)
  437. return _hash.hexdigest()