base.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # This file is part of Radicale - CalDAV and CardDAV server
  2. # Copyright © 2014 Jean-Marc Martins
  3. # Copyright © 2012-2017 Guillaume Ayoub
  4. # Copyright © 2017-2022 Unrud <unrud@outlook.com>
  5. # Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
  6. #
  7. # This library is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with Radicale. If not, see <http://www.gnu.org/licenses/>.
  19. import os
  20. import sys
  21. from tempfile import TemporaryDirectory
  22. from typing import IO, AnyStr, ClassVar, Iterator, Optional, Type
  23. from radicale import config, pathutils, storage, types
  24. from radicale.storage import multifilesystem # noqa:F401
  25. class CollectionBase(storage.BaseCollection):
  26. _storage: "multifilesystem.Storage"
  27. _path: str
  28. _encoding: str
  29. _filesystem_path: str
  30. def __init__(self, storage_: "multifilesystem.Storage", path: str,
  31. filesystem_path: Optional[str] = None) -> None:
  32. super().__init__()
  33. self._storage = storage_
  34. folder = storage_._get_collection_root_folder()
  35. # Path should already be sanitized
  36. self._path = pathutils.strip_path(path)
  37. self._encoding = storage_.configuration.get("encoding", "stock")
  38. self._skip_broken_item = storage_.configuration.get("storage", "skip_broken_item")
  39. if filesystem_path is None:
  40. filesystem_path = pathutils.path_to_filesystem(folder, self.path)
  41. self._filesystem_path = filesystem_path
  42. # TODO: better fix for "mypy"
  43. @types.contextmanager # type: ignore
  44. def _atomic_write(self, path: str, mode: str = "w",
  45. newline: Optional[str] = None) -> Iterator[IO[AnyStr]]:
  46. # TODO: Overload with Literal when dropping support for Python < 3.8
  47. parent_dir, name = os.path.split(path)
  48. # Do not use mkstemp because it creates with permissions 0o600
  49. with TemporaryDirectory(
  50. prefix=".Radicale.tmp-", dir=parent_dir) as tmp_dir:
  51. with open(os.path.join(tmp_dir, name), mode, newline=newline,
  52. encoding=None if "b" in mode else self._encoding) as tmp:
  53. yield tmp
  54. tmp.flush()
  55. self._storage._fsync(tmp)
  56. os.replace(os.path.join(tmp_dir, name), path)
  57. self._storage._sync_directory(parent_dir)
  58. class StorageBase(storage.BaseStorage):
  59. _collection_class: ClassVar[Type["multifilesystem.Collection"]]
  60. _filesystem_folder: str
  61. _filesystem_fsync: bool
  62. def __init__(self, configuration: config.Configuration) -> None:
  63. super().__init__(configuration)
  64. self._filesystem_folder = configuration.get(
  65. "storage", "filesystem_folder")
  66. self._filesystem_fsync = configuration.get(
  67. "storage", "_filesystem_fsync")
  68. def _get_collection_root_folder(self) -> str:
  69. return os.path.join(self._filesystem_folder, "collection-root")
  70. def _fsync(self, f: IO[AnyStr]) -> None:
  71. if self._filesystem_fsync:
  72. try:
  73. pathutils.fsync(f.fileno())
  74. except OSError as e:
  75. raise RuntimeError("Fsync'ing file %r failed: %s" %
  76. (f.name, e)) from e
  77. def _sync_directory(self, path: str) -> None:
  78. """Sync directory to disk.
  79. This only works on POSIX and does nothing on other systems.
  80. """
  81. if not self._filesystem_fsync:
  82. return
  83. if sys.platform != "win32":
  84. try:
  85. fd = os.open(path, 0)
  86. try:
  87. pathutils.fsync(fd)
  88. finally:
  89. os.close(fd)
  90. except OSError as e:
  91. raise RuntimeError("Fsync'ing directory %r failed: %s" %
  92. (path, e)) from e
  93. def _makedirs_synced(self, filesystem_path: str) -> None:
  94. """Recursively create a directory and its parents in a sync'ed way.
  95. This method acts silently when the folder already exists.
  96. """
  97. if os.path.isdir(filesystem_path):
  98. return
  99. parent_filesystem_path = os.path.dirname(filesystem_path)
  100. # Prevent infinite loop
  101. if filesystem_path != parent_filesystem_path:
  102. # Create parent dirs recursively
  103. self._makedirs_synced(parent_filesystem_path)
  104. # Possible race!
  105. os.makedirs(filesystem_path, exist_ok=True)
  106. self._sync_directory(parent_filesystem_path)