pathutils.py 11 KB


  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2022 Unrud <unrud@outlook.com>
  5. # Copyright © 2025-2025 Peter Bieringer <pb@bieringer.de>
  6. #
  7. # This library is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  19. """
  20. Helper functions for working with the file system.
  21. """
  22. import errno
  23. import os
  24. import pathlib
  25. import posixpath
  26. import sys
  27. import threading
  28. from tempfile import TemporaryDirectory
  29. from typing import Iterator, Type, Union
  30. from radicale import storage, types, utils
  31. if sys.platform == "win32":
  32. import ctypes
  33. import ctypes.wintypes
  34. import msvcrt
  35. LOCKFILE_EXCLUSIVE_LOCK: int = 2
  36. ULONG_PTR: Union[Type[ctypes.c_uint32], Type[ctypes.c_uint64]]
  37. if ctypes.sizeof(ctypes.c_void_p) == 4:
  38. ULONG_PTR = ctypes.c_uint32
  39. else:
  40. ULONG_PTR = ctypes.c_uint64
  41. class Overlapped(ctypes.Structure):
  42. _fields_ = [
  43. ("internal", ULONG_PTR),
  44. ("internal_high", ULONG_PTR),
  45. ("offset", ctypes.wintypes.DWORD),
  46. ("offset_high", ctypes.wintypes.DWORD),
  47. ("h_event", ctypes.wintypes.HANDLE)]
  48. kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
  49. lock_file_ex = kernel32.LockFileEx
  50. lock_file_ex.argtypes = [
  51. ctypes.wintypes.HANDLE,
  52. ctypes.wintypes.DWORD,
  53. ctypes.wintypes.DWORD,
  54. ctypes.wintypes.DWORD,
  55. ctypes.wintypes.DWORD,
  56. ctypes.POINTER(Overlapped)]
  57. lock_file_ex.restype = ctypes.wintypes.BOOL
  58. unlock_file_ex = kernel32.UnlockFileEx
  59. unlock_file_ex.argtypes = [
  60. ctypes.wintypes.HANDLE,
  61. ctypes.wintypes.DWORD,
  62. ctypes.wintypes.DWORD,
  63. ctypes.wintypes.DWORD,
  64. ctypes.POINTER(Overlapped)]
  65. unlock_file_ex.restype = ctypes.wintypes.BOOL
  66. else:
  67. import fcntl
  68. if sys.platform == "linux":
  69. import ctypes
  70. RENAME_EXCHANGE: int = 2
  71. renameat2 = None
  72. try:
  73. renameat2 = ctypes.CDLL(None, use_errno=True).renameat2
  74. except AttributeError:
  75. pass
  76. else:
  77. renameat2.argtypes = [
  78. ctypes.c_int, ctypes.c_char_p,
  79. ctypes.c_int, ctypes.c_char_p,
  80. ctypes.c_uint]
  81. renameat2.restype = ctypes.c_int
  82. if sys.platform == "darwin":
  83. # Definition missing in PyPy
  84. F_FULLFSYNC: int = getattr(fcntl, "F_FULLFSYNC", 51)
  85. class RwLock:
  86. """A readers-Writer lock that locks a file."""
  87. _path: str
  88. _readers: int
  89. _writer: bool
  90. _lock: threading.Lock
  91. def __init__(self, path: str) -> None:
  92. self._path = path
  93. self._readers = 0
  94. self._writer = False
  95. self._lock = threading.Lock()
  96. @property
  97. def locked(self) -> str:
  98. with self._lock:
  99. if self._readers > 0:
  100. return "r"
  101. if self._writer:
  102. return "w"
  103. return ""
  104. @types.contextmanager
  105. def acquire(self, mode: str) -> Iterator[None]:
  106. if mode not in "rw":
  107. raise ValueError("Invalid mode: %r" % mode)
  108. with open(self._path, "w+") as lock_file:
  109. if sys.platform == "win32":
  110. handle = msvcrt.get_osfhandle(lock_file.fileno())
  111. flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
  112. overlapped = Overlapped()
  113. try:
  114. if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
  115. raise ctypes.WinError()
  116. except OSError as e:
  117. raise RuntimeError("Locking the storage failed: %s" % e
  118. ) from e
  119. else:
  120. _cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
  121. try:
  122. fcntl.flock(lock_file.fileno(), _cmd)
  123. except OSError as e:
  124. raise RuntimeError("Locking the storage failed: %s" % e
  125. ) from e
  126. with self._lock:
  127. if self._writer or mode == "w" and self._readers != 0:
  128. raise RuntimeError("Locking the storage failed: "
  129. "Guarantees failed")
  130. if mode == "r":
  131. self._readers += 1
  132. else:
  133. self._writer = True
  134. try:
  135. yield
  136. finally:
  137. with self._lock:
  138. if mode == "r":
  139. self._readers -= 1
  140. self._writer = False
  141. def rename_exchange(src: str, dst: str) -> None:
  142. """Exchange the files or directories `src` and `dst`.
  143. Both `src` and `dst` must exist but may be of different types.
  144. On Linux with renameat2 the operation is atomic.
  145. On other platforms it's not atomic.
  146. """
  147. src_dir, src_base = os.path.split(src)
  148. dst_dir, dst_base = os.path.split(dst)
  149. src_dir = src_dir or os.curdir
  150. dst_dir = dst_dir or os.curdir
  151. if not src_base or not dst_base:
  152. raise ValueError("Invalid arguments: %r -> %r" % (src, dst))
  153. if sys.platform == "linux" and renameat2:
  154. src_base_bytes = os.fsencode(src_base)
  155. dst_base_bytes = os.fsencode(dst_base)
  156. src_dir_fd = os.open(src_dir, 0)
  157. try:
  158. dst_dir_fd = os.open(dst_dir, 0)
  159. try:
  160. if renameat2(src_dir_fd, src_base_bytes,
  161. dst_dir_fd, dst_base_bytes,
  162. RENAME_EXCHANGE) == 0:
  163. return
  164. errno_ = ctypes.get_errno()
  165. # Fallback if RENAME_EXCHANGE not supported by filesystem
  166. if errno_ != errno.EINVAL:
  167. raise OSError(errno_, os.strerror(errno_))
  168. finally:
  169. os.close(dst_dir_fd)
  170. finally:
  171. os.close(src_dir_fd)
  172. with TemporaryDirectory(prefix=".Radicale.tmp-", dir=src_dir
  173. ) as tmp_dir:
  174. os.rename(dst, os.path.join(tmp_dir, "interim"))
  175. os.rename(src, dst)
  176. os.rename(os.path.join(tmp_dir, "interim"), src)
  177. def fsync(fd: int) -> None:
  178. if sys.platform == "darwin":
  179. try:
  180. fcntl.fcntl(fd, F_FULLFSYNC)
  181. return
  182. except OSError as e:
  183. # Fallback if F_FULLFSYNC not supported by filesystem
  184. if e.errno != errno.EINVAL:
  185. raise
  186. os.fsync(fd)
  187. def strip_path(path: str) -> str:
  188. assert sanitize_path(path) == path
  189. return path.strip("/")
  190. def unstrip_path(stripped_path: str, trailing_slash: bool = False) -> str:
  191. assert strip_path(sanitize_path(stripped_path)) == stripped_path
  192. assert stripped_path or trailing_slash
  193. path = "/%s" % stripped_path
  194. if trailing_slash and not path.endswith("/"):
  195. path += "/"
  196. return path
  197. def sanitize_path(path: str) -> str:
  198. """Make path absolute with leading slash to prevent access to other data.
  199. Preserve potential trailing slash.
  200. """
  201. trailing_slash = "/" if path.endswith("/") else ""
  202. path = posixpath.normpath(path)
  203. new_path = "/"
  204. for part in path.split("/"):
  205. if not is_safe_path_component(part):
  206. continue
  207. new_path = posixpath.join(new_path, part)
  208. trailing_slash = "" if new_path.endswith("/") else trailing_slash
  209. return new_path + trailing_slash
  210. def is_safe_path_component(path: str) -> bool:
  211. """Check if path is a single component of a path.
  212. Check that the path is safe to join too.
  213. """
  214. return bool(path) and "/" not in path and path not in (".", "..")
  215. def is_safe_filesystem_path_component(path: str) -> bool:
  216. """Check if path is a single component of a local and posix filesystem
  217. path.
  218. Check that the path is safe to join too.
  219. """
  220. return (
  221. bool(path) and not os.path.splitdrive(path)[0] and
  222. (sys.platform != "win32" or ":" not in path) and # Block NTFS-ADS
  223. not os.path.split(path)[0] and path not in (os.curdir, os.pardir) and
  224. not path.startswith(".") and not path.endswith("~") and
  225. is_safe_path_component(path))
  226. def path_to_filesystem(root: str, sane_path: str) -> str:
  227. """Convert `sane_path` to a local filesystem path relative to `root`.
  228. `root` must be a secure filesystem path, it will be prepend to the path.
  229. `sane_path` must be a sanitized path without leading or trailing ``/``.
  230. Conversion of `sane_path` is done in a secure manner,
  231. or raises ``ValueError``.
  232. """
  233. assert sane_path == strip_path(sanitize_path(sane_path))
  234. safe_path = root
  235. parts = sane_path.split("/") if sane_path else []
  236. for part in parts:
  237. if not is_safe_filesystem_path_component(part):
  238. raise UnsafePathError(part)
  239. safe_path_parent = safe_path
  240. safe_path = os.path.join(safe_path, part)
  241. # Check for conflicting files (e.g. case-insensitive file systems
  242. # or short names on Windows file systems)
  243. if os.path.lexists(safe_path):
  244. with os.scandir(safe_path_parent) as entries:
  245. if part not in (e.name for e in entries):
  246. raise CollidingPathError(part)
  247. return safe_path
  248. class UnsafePathError(ValueError):
  249. def __init__(self, path: str) -> None:
  250. super().__init__("Can't translate name safely to filesystem: %r" %
  251. path)
  252. class CollidingPathError(ValueError):
  253. def __init__(self, path: str) -> None:
  254. super().__init__("File name collision: %r" % path)
  255. def name_from_path(path: str, collection: "storage.BaseCollection") -> str:
  256. """Return Radicale item name from ``path``."""
  257. assert sanitize_path(path) == path
  258. start = unstrip_path(collection.path, True)
  259. if not (path + "/").startswith(start):
  260. raise ValueError("%r doesn't start with %r" % (path, start))
  261. name = path[len(start):]
  262. if name and not is_safe_path_component(name):
  263. raise ValueError("%r is not a component in collection %r" %
  264. (name, collection.path))
  265. return name
  266. def path_permissions(path):
  267. path = pathlib.Path(path)
  268. try:
  269. uid = utils.unknown_if_empty(path.stat().st_uid)
  270. except (KeyError, NotImplementedError):
  271. uid = "UNKNOWN"
  272. try:
  273. gid = utils.unknown_if_empty(path.stat().st_gid)
  274. except (KeyError, NotImplementedError):
  275. gid = "UNKNOWN"
  276. try:
  277. mode = utils.unknown_if_empty("%o" % path.stat().st_mode)
  278. except (KeyError, NotImplementedError):
  279. mode = "UNKNOWN"
  280. try:
  281. owner = utils.unknown_if_empty(path.owner())
  282. except (KeyError, NotImplementedError):
  283. owner = "UNKNOWN"
  284. try:
  285. group = utils.unknown_if_empty(path.group())
  286. except (KeyError, NotImplementedError):
  287. group = "UNKNOWN"
  288. return [owner, uid, group, gid, mode]
  289. def path_permissions_as_string(path):
  290. pp = path_permissions(path)
  291. s = "path=%r owner=%s(%s) group=%s(%s) mode=%s" % (path, pp[0], pp[1], pp[2], pp[3], pp[4])
  292. return s