| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- # This file is part of Radicale - CalDAV and CardDAV server
- # Copyright © 2014 Jean-Marc Martins
- # Copyright © 2012-2017 Guillaume Ayoub
- # Copyright © 2017-2022 Unrud <unrud@outlook.com>
- # Copyright © 2025-2025 Peter Bieringer <pb@bieringer.de>
- #
- # This library is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
- """
- Helper functions for working with the file system.
- """
- import errno
- import os
- import pathlib
- import posixpath
- import sys
- import threading
- from tempfile import TemporaryDirectory
- from typing import Iterator, Type, Union
- from radicale import storage, types, utils
- if sys.platform == "win32":
- import ctypes
- import ctypes.wintypes
- import msvcrt
- LOCKFILE_EXCLUSIVE_LOCK: int = 2
- ULONG_PTR: Union[Type[ctypes.c_uint32], Type[ctypes.c_uint64]]
- if ctypes.sizeof(ctypes.c_void_p) == 4:
- ULONG_PTR = ctypes.c_uint32
- else:
- ULONG_PTR = ctypes.c_uint64
- class Overlapped(ctypes.Structure):
- _fields_ = [
- ("internal", ULONG_PTR),
- ("internal_high", ULONG_PTR),
- ("offset", ctypes.wintypes.DWORD),
- ("offset_high", ctypes.wintypes.DWORD),
- ("h_event", ctypes.wintypes.HANDLE)]
- kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
- lock_file_ex = kernel32.LockFileEx
- lock_file_ex.argtypes = [
- ctypes.wintypes.HANDLE,
- ctypes.wintypes.DWORD,
- ctypes.wintypes.DWORD,
- ctypes.wintypes.DWORD,
- ctypes.wintypes.DWORD,
- ctypes.POINTER(Overlapped)]
- lock_file_ex.restype = ctypes.wintypes.BOOL
- unlock_file_ex = kernel32.UnlockFileEx
- unlock_file_ex.argtypes = [
- ctypes.wintypes.HANDLE,
- ctypes.wintypes.DWORD,
- ctypes.wintypes.DWORD,
- ctypes.wintypes.DWORD,
- ctypes.POINTER(Overlapped)]
- unlock_file_ex.restype = ctypes.wintypes.BOOL
- else:
- import fcntl
- if sys.platform == "linux":
- import ctypes
- RENAME_EXCHANGE: int = 2
- renameat2 = None
- try:
- renameat2 = ctypes.CDLL(None, use_errno=True).renameat2
- except AttributeError:
- pass
- else:
- renameat2.argtypes = [
- ctypes.c_int, ctypes.c_char_p,
- ctypes.c_int, ctypes.c_char_p,
- ctypes.c_uint]
- renameat2.restype = ctypes.c_int
- if sys.platform == "darwin":
- # Definition missing in PyPy
- F_FULLFSYNC: int = getattr(fcntl, "F_FULLFSYNC", 51)
- class RwLock:
- """A readers-Writer lock that locks a file."""
- _path: str
- _readers: int
- _writer: bool
- _lock: threading.Lock
- def __init__(self, path: str) -> None:
- self._path = path
- self._readers = 0
- self._writer = False
- self._lock = threading.Lock()
- @property
- def locked(self) -> str:
- with self._lock:
- if self._readers > 0:
- return "r"
- if self._writer:
- return "w"
- return ""
- @types.contextmanager
- def acquire(self, mode: str) -> Iterator[None]:
- if mode not in "rw":
- raise ValueError("Invalid mode: %r" % mode)
- with open(self._path, "w+") as lock_file:
- if sys.platform == "win32":
- handle = msvcrt.get_osfhandle(lock_file.fileno())
- flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
- overlapped = Overlapped()
- try:
- if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
- raise ctypes.WinError()
- except OSError as e:
- raise RuntimeError("Locking the storage failed: %s" % e
- ) from e
- else:
- _cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
- try:
- fcntl.flock(lock_file.fileno(), _cmd)
- except OSError as e:
- raise RuntimeError("Locking the storage failed: %s" % e
- ) from e
- with self._lock:
- if self._writer or mode == "w" and self._readers != 0:
- raise RuntimeError("Locking the storage failed: "
- "Guarantees failed")
- if mode == "r":
- self._readers += 1
- else:
- self._writer = True
- try:
- yield
- finally:
- with self._lock:
- if mode == "r":
- self._readers -= 1
- self._writer = False
- def rename_exchange(src: str, dst: str) -> None:
- """Exchange the files or directories `src` and `dst`.
- Both `src` and `dst` must exist but may be of different types.
- On Linux with renameat2 the operation is atomic.
- On other platforms it's not atomic.
- """
- src_dir, src_base = os.path.split(src)
- dst_dir, dst_base = os.path.split(dst)
- src_dir = src_dir or os.curdir
- dst_dir = dst_dir or os.curdir
- if not src_base or not dst_base:
- raise ValueError("Invalid arguments: %r -> %r" % (src, dst))
- if sys.platform == "linux" and renameat2:
- src_base_bytes = os.fsencode(src_base)
- dst_base_bytes = os.fsencode(dst_base)
- src_dir_fd = os.open(src_dir, 0)
- try:
- dst_dir_fd = os.open(dst_dir, 0)
- try:
- if renameat2(src_dir_fd, src_base_bytes,
- dst_dir_fd, dst_base_bytes,
- RENAME_EXCHANGE) == 0:
- return
- errno_ = ctypes.get_errno()
- # Fallback if RENAME_EXCHANGE not supported by filesystem
- if errno_ != errno.EINVAL:
- raise OSError(errno_, os.strerror(errno_))
- finally:
- os.close(dst_dir_fd)
- finally:
- os.close(src_dir_fd)
- with TemporaryDirectory(prefix=".Radicale.tmp-", dir=src_dir
- ) as tmp_dir:
- os.rename(dst, os.path.join(tmp_dir, "interim"))
- os.rename(src, dst)
- os.rename(os.path.join(tmp_dir, "interim"), src)
- def fsync(fd: int) -> None:
- if sys.platform == "darwin":
- try:
- fcntl.fcntl(fd, F_FULLFSYNC)
- return
- except OSError as e:
- # Fallback if F_FULLFSYNC not supported by filesystem
- if e.errno != errno.EINVAL:
- raise
- os.fsync(fd)
- def strip_path(path: str) -> str:
- assert sanitize_path(path) == path
- return path.strip("/")
- def unstrip_path(stripped_path: str, trailing_slash: bool = False) -> str:
- assert strip_path(sanitize_path(stripped_path)) == stripped_path
- assert stripped_path or trailing_slash
- path = "/%s" % stripped_path
- if trailing_slash and not path.endswith("/"):
- path += "/"
- return path
- def sanitize_path(path: str) -> str:
- """Make path absolute with leading slash to prevent access to other data.
- Preserve potential trailing slash.
- """
- trailing_slash = "/" if path.endswith("/") else ""
- path = posixpath.normpath(path)
- new_path = "/"
- for part in path.split("/"):
- if not is_safe_path_component(part):
- continue
- new_path = posixpath.join(new_path, part)
- trailing_slash = "" if new_path.endswith("/") else trailing_slash
- return new_path + trailing_slash
- def is_safe_path_component(path: str) -> bool:
- """Check if path is a single component of a path.
- Check that the path is safe to join too.
- """
- return bool(path) and "/" not in path and path not in (".", "..")
- def is_safe_filesystem_path_component(path: str) -> bool:
- """Check if path is a single component of a local and posix filesystem
- path.
- Check that the path is safe to join too.
- """
- return (
- bool(path) and not os.path.splitdrive(path)[0] and
- (sys.platform != "win32" or ":" not in path) and # Block NTFS-ADS
- not os.path.split(path)[0] and path not in (os.curdir, os.pardir) and
- not path.startswith(".") and not path.endswith("~") and
- is_safe_path_component(path))
- def path_to_filesystem(root: str, sane_path: str) -> str:
- """Convert `sane_path` to a local filesystem path relative to `root`.
- `root` must be a secure filesystem path, it will be prepend to the path.
- `sane_path` must be a sanitized path without leading or trailing ``/``.
- Conversion of `sane_path` is done in a secure manner,
- or raises ``ValueError``.
- """
- assert sane_path == strip_path(sanitize_path(sane_path))
- safe_path = root
- parts = sane_path.split("/") if sane_path else []
- for part in parts:
- if not is_safe_filesystem_path_component(part):
- raise UnsafePathError(part)
- safe_path_parent = safe_path
- safe_path = os.path.join(safe_path, part)
- # Check for conflicting files (e.g. case-insensitive file systems
- # or short names on Windows file systems)
- if os.path.lexists(safe_path):
- with os.scandir(safe_path_parent) as entries:
- if part not in (e.name for e in entries):
- raise CollidingPathError(part)
- return safe_path
- class UnsafePathError(ValueError):
- def __init__(self, path: str) -> None:
- super().__init__("Can't translate name safely to filesystem: %r" %
- path)
- class CollidingPathError(ValueError):
- def __init__(self, path: str) -> None:
- super().__init__("File name collision: %r" % path)
- def name_from_path(path: str, collection: "storage.BaseCollection") -> str:
- """Return Radicale item name from ``path``."""
- assert sanitize_path(path) == path
- start = unstrip_path(collection.path, True)
- if not (path + "/").startswith(start):
- raise ValueError("%r doesn't start with %r" % (path, start))
- name = path[len(start):]
- if name and not is_safe_path_component(name):
- raise ValueError("%r is not a component in collection %r" %
- (name, collection.path))
- return name
- def path_permissions(path):
- path = pathlib.Path(path)
- try:
- uid = utils.unknown_if_empty(path.stat().st_uid)
- except (KeyError, NotImplementedError):
- uid = "UNKNOWN"
- try:
- gid = utils.unknown_if_empty(path.stat().st_gid)
- except (KeyError, NotImplementedError):
- gid = "UNKNOWN"
- try:
- mode = utils.unknown_if_empty("%o" % path.stat().st_mode)
- except (KeyError, NotImplementedError):
- mode = "UNKNOWN"
- try:
- owner = utils.unknown_if_empty(path.owner())
- except (KeyError, NotImplementedError):
- owner = "UNKNOWN"
- try:
- group = utils.unknown_if_empty(path.group())
- except (KeyError, NotImplementedError):
- group = "UNKNOWN"
- return [owner, uid, group, gid, mode]
- def path_permissions_as_string(path):
- pp = path_permissions(path)
- s = "path=%r owner=%s(%s) group=%s(%s) mode=%s" % (path, pp[0], pp[1], pp[2], pp[3], pp[4])
- return s
|